You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC

svn commit: r529410 [6/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Mon Apr 16 14:44:35 2007
@@ -75,18 +75,18 @@
   }
 
   DatanodeInfo( DatanodeID nodeID ) {
-      super( nodeID );
-      this.capacity = 0L;
-      this.remaining = 0L;
-      this.lastUpdate = 0L;
-      this.xceiverCount = 0;
-      this.adminState = null;    
+    super( nodeID );
+    this.capacity = 0L;
+    this.remaining = 0L;
+    this.lastUpdate = 0L;
+    this.xceiverCount = 0;
+    this.adminState = null;    
   }
   
   DatanodeInfo( DatanodeID nodeID, String location, String hostName ) {
-      this(nodeID);
-      this.location = location;
-      this.hostName = hostName;
+    this(nodeID);
+    this.location = location;
+    this.hostName = hostName;
   }
   
   /** The raw capacity. */
@@ -130,7 +130,7 @@
   }
   
   public String getPath() {
-      return location+NodeBase.PATH_SEPARATOR_STR+name;
+    return location+NodeBase.PATH_SEPARATOR_STR+name;
   }
 
   
@@ -150,7 +150,7 @@
     long u = c - r;
     buffer.append("Name: "+name+"\n");
     if(!NetworkTopology.DEFAULT_RACK.equals(location)) {
-        buffer.append("Rack: "+location+"\n");
+      buffer.append("Rack: "+location+"\n");
     }
     if (isDecommissioned()) {
       buffer.append("State          : Decommissioned\n");
@@ -174,7 +174,7 @@
     long u = c - r;
     buffer.append(name);
     if(!NetworkTopology.DEFAULT_RACK.equals(location)) {
-        buffer.append(" "+location);
+      buffer.append(" "+location);
     }
     if (isDecommissioned()) {
       buffer.append(" DD");
@@ -209,51 +209,51 @@
   /**
    * Returns true if the node is in the process of being decommissioned
    */
-   boolean isDecommissionInProgress() {
-     if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
-       return true;
-     }
-     return false;
-   }
+  boolean isDecommissionInProgress() {
+    if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
+      return true;
+    }
+    return false;
+  }
 
   /**
    * Returns true if the node has been decommissioned.
    */
-   boolean isDecommissioned() {
-     if (adminState == AdminStates.DECOMMISSIONED) {
-       return true;
-     }
-     return false;
-   }
+  boolean isDecommissioned() {
+    if (adminState == AdminStates.DECOMMISSIONED) {
+      return true;
+    }
+    return false;
+  }
 
   /**
    * Sets the admin state to indicate that decommision is complete.
    */
-   void setDecommissioned() {
-     adminState = AdminStates.DECOMMISSIONED;
-   }
-
-   /**
-    * Retrieves the admin state of this node.
-    */
-    AdminStates getAdminState() {
-      if (adminState == null) {
-        return AdminStates.NORMAL;
-      }
-      return adminState;
-    }
-
-   /**
-    * Sets the admin state of this node.
-    */
-    void setAdminState(AdminStates newState) {
-      if (newState == AdminStates.NORMAL) {
-        adminState = null;
-      }
-      else {
-        adminState = newState;
-      }
+  void setDecommissioned() {
+    adminState = AdminStates.DECOMMISSIONED;
+  }
+
+  /**
+   * Retrieves the admin state of this node.
+   */
+  AdminStates getAdminState() {
+    if (adminState == null) {
+      return AdminStates.NORMAL;
     }
+    return adminState;
+  }
+
+  /**
+   * Sets the admin state of this node.
+   */
+  void setAdminState(AdminStates newState) {
+    if (newState == AdminStates.NORMAL) {
+      adminState = null;
+    }
+    else {
+      adminState = newState;
+    }
+  }
 
   private int level; //which level of the tree the node resides
   private Node parent; //its parent
@@ -301,7 +301,7 @@
     this.xceiverCount = in.readInt();
     this.location = Text.readString( in );
     AdminStates newState = (AdminStates) WritableUtils.readEnum(in,
-                                         AdminStates.class);
+                                                                AdminStates.class);
     setAdminState(newState);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon Apr 16 14:44:35 2007
@@ -64,47 +64,47 @@
    * new storageID if the datanode did not have one and
    * registration ID for further communication.
    */
-    public DatanodeRegistration register( DatanodeRegistration registration,
-                                          String networkLocation
+  public DatanodeRegistration register( DatanodeRegistration registration,
+                                        String networkLocation
                                         ) throws IOException;
-    /**
-     * sendHeartbeat() tells the NameNode that the DataNode is still
-     * alive and well.  Includes some status info, too. 
-     * It also gives the NameNode a chance to return a "DatanodeCommand" object.
-     * A DatanodeCommand tells the DataNode to invalidate local block(s), 
-     * or to copy them to other DataNodes, etc.
-     */
-    public DatanodeCommand sendHeartbeat( DatanodeRegistration registration,
-                                          long capacity, long remaining,
-                                          int xmitsInProgress,
-                                          int xceiverCount) throws IOException;
+  /**
+   * sendHeartbeat() tells the NameNode that the DataNode is still
+   * alive and well.  Includes some status info, too. 
+   * It also gives the NameNode a chance to return a "DatanodeCommand" object.
+   * A DatanodeCommand tells the DataNode to invalidate local block(s), 
+   * or to copy them to other DataNodes, etc.
+   */
+  public DatanodeCommand sendHeartbeat( DatanodeRegistration registration,
+                                        long capacity, long remaining,
+                                        int xmitsInProgress,
+                                        int xceiverCount) throws IOException;
 
-    /**
-     * blockReport() tells the NameNode about all the locally-stored blocks.
-     * The NameNode returns an array of Blocks that have become obsolete
-     * and should be deleted.  This function is meant to upload *all*
-     * the locally-stored blocks.  It's invoked upon startup and then
-     * infrequently afterwards.
-     */
-    public DatanodeCommand blockReport( DatanodeRegistration registration,
-                                        Block blocks[]) throws IOException;
+  /**
+   * blockReport() tells the NameNode about all the locally-stored blocks.
+   * The NameNode returns an array of Blocks that have become obsolete
+   * and should be deleted.  This function is meant to upload *all*
+   * the locally-stored blocks.  It's invoked upon startup and then
+   * infrequently afterwards.
+   */
+  public DatanodeCommand blockReport( DatanodeRegistration registration,
+                                      Block blocks[]) throws IOException;
     
-    /**
-     * blockReceived() allows the DataNode to tell the NameNode about
-     * recently-received block data.  For example, whenever client code
-     * writes a new Block here, or another DataNode copies a Block to
-     * this DataNode, it will call blockReceived().
-     */
-    public void blockReceived(DatanodeRegistration registration,
-                              Block blocks[]) throws IOException;
+  /**
+   * blockReceived() allows the DataNode to tell the NameNode about
+   * recently-received block data.  For example, whenever client code
+   * writes a new Block here, or another DataNode copies a Block to
+   * this DataNode, it will call blockReceived().
+   */
+  public void blockReceived(DatanodeRegistration registration,
+                            Block blocks[]) throws IOException;
 
-    /**
-     * errorReport() tells the NameNode about something that has gone
-     * awry.  Useful for debugging.
-     */
-    public void errorReport(DatanodeRegistration registration,
-                            int errorCode, 
-                            String msg) throws IOException;
+  /**
+   * errorReport() tells the NameNode about something that has gone
+   * awry.  Useful for debugging.
+   */
+  public void errorReport(DatanodeRegistration registration,
+                          int errorCode, 
+                          String msg) throws IOException;
     
-    public NamespaceInfo versionRequest() throws IOException;
+  public NamespaceInfo versionRequest() throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java Mon Apr 16 14:44:35 2007
@@ -26,29 +26,29 @@
  * a file with the names in a directory listing to make accesses faster.
  */
 class DfsPath extends Path {
-    DFSFileInfo info;
+  DFSFileInfo info;
 
-    public DfsPath(DFSFileInfo info) {
-        super(info.getPath());
-        this.info = info;
-    }
+  public DfsPath(DFSFileInfo info) {
+    super(info.getPath());
+    this.info = info;
+  }
 
-    public boolean isDirectory() {
-        return info.isDir();
-    }
-    public boolean isFile() {
-        return ! isDirectory();
-    }
-    public long length() {
-        return info.getLen();
-    }
-    public long getContentsLength() {
-        return info.getContentsLen();
-    }
-    public short getReplication() {
-      return info.getReplication();
-    }
-    public long getBlockSize() {
-      return info.getBlockSize();
-    }
+  public boolean isDirectory() {
+    return info.isDir();
+  }
+  public boolean isFile() {
+    return ! isDirectory();
+  }
+  public long length() {
+    return info.getLen();
+  }
+  public long getContentsLength() {
+    return info.getContentsLen();
+  }
+  public short getReplication() {
+    return info.getReplication();
+  }
+  public long getBlockSize() {
+    return info.getBlockSize();
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Mon Apr 16 14:44:35 2007
@@ -34,9 +34,9 @@
  * @author Mike Cafarella
  *****************************************************************/
 public class DistributedFileSystem extends ChecksumFileSystem {
-    private static class RawDistributedFileSystem extends FileSystem {
+  private static class RawDistributedFileSystem extends FileSystem {
     private Path workingDir =
-        new Path("/user", System.getProperty("user.name")); 
+      new Path("/user", System.getProperty("user.name")); 
     private URI uri;
     private FileSystem localFs;
 
@@ -48,7 +48,7 @@
 
     /** @deprecated */
     public RawDistributedFileSystem(InetSocketAddress namenode,
-                                 Configuration conf) throws IOException {
+                                    Configuration conf) throws IOException {
       initialize(URI.create("hdfs://"+
                             namenode.getHostName()+":"+
                             namenode.getPort()),
@@ -111,8 +111,8 @@
       String result = makeAbsolute(file).toUri().getPath();
       if (!FSNamesystem.isValidName(result)) {
         throw new IllegalArgumentException("Pathname " + result + " from " +
-                                            file +
-                                            " is not a valid DFS filename.");
+                                           file +
+                                           " is not a valid DFS filename.");
       }
       return new UTF8(result);
     }
@@ -130,10 +130,10 @@
     }
 
     public FSDataOutputStream create(Path f, boolean overwrite,
-            int bufferSize, short replication, long blockSize,
-            Progressable progress) throws IOException {
+                                     int bufferSize, short replication, long blockSize,
+                                     Progressable progress) throws IOException {
       if (exists(f) && ! overwrite) {
-         throw new IOException("File already exists:"+f);
+        throw new IOException("File already exists:"+f);
       }
       Path parent = f.getParent();
       if (parent != null && !mkdirs(parent)) {
@@ -141,14 +141,14 @@
       }
       
       return new FSDataOutputStream(
-           dfs.create(getPath(f), overwrite,
-                   replication, blockSize, progress),
-           bufferSize);
+                                    dfs.create(getPath(f), overwrite,
+                                               replication, blockSize, progress),
+                                    bufferSize);
     }
     
     public boolean setReplication( Path src, 
-                                      short replication
-                                    ) throws IOException {
+                                   short replication
+                                   ) throws IOException {
       return dfs.setReplication(getPath(src), replication);
     }
     
@@ -163,36 +163,36 @@
      * Get rid of Path f, whether a true file or dir.
      */
     public boolean delete(Path f) throws IOException {
-        return dfs.delete(getPath(f));
+      return dfs.delete(getPath(f));
     }
 
     public boolean exists(Path f) throws IOException {
-        return dfs.exists(getPath(f));
+      return dfs.exists(getPath(f));
     }
 
     public boolean isDirectory(Path f) throws IOException {
-        if (f instanceof DfsPath) {
-          return ((DfsPath)f).isDirectory();
-        }
-        return dfs.isDirectory(getPath(f));
+      if (f instanceof DfsPath) {
+        return ((DfsPath)f).isDirectory();
+      }
+      return dfs.isDirectory(getPath(f));
     }
 
     public long getLength(Path f) throws IOException {
-        if (f instanceof DfsPath) {
-          return ((DfsPath)f).length();
-        }
+      if (f instanceof DfsPath) {
+        return ((DfsPath)f).length();
+      }
 
-        DFSFileInfo info[] = dfs.listPaths(getPath(f));
-        return (info == null) ? 0 : info[0].getLen();
+      DFSFileInfo info[] = dfs.listPaths(getPath(f));
+      return (info == null) ? 0 : info[0].getLen();
     }
 
     public long getContentLength(Path f) throws IOException {
-        if (f instanceof DfsPath) {
-            return ((DfsPath)f).getContentsLength();
-          }
+      if (f instanceof DfsPath) {
+        return ((DfsPath)f).getContentsLength();
+      }
 
-          DFSFileInfo info[] = dfs.listPaths(getPath(f));
-          return (info == null) ? 0 : info[0].getContentsLen();
+      DFSFileInfo info[] = dfs.listPaths(getPath(f));
+      return (info == null) ? 0 : info[0].getContentsLen();
     }
 
     public short getReplication(Path f) throws IOException {
@@ -202,44 +202,44 @@
 
       DFSFileInfo info[] = dfs.listPaths(getPath(f));
       return info[0].getReplication();
-  }
+    }
 
     public Path[] listPaths(Path f) throws IOException {
-        DFSFileInfo info[] = dfs.listPaths(getPath(f));
-        if (info == null) {
-            return new Path[0];
-        } else {
-            Path results[] = new DfsPath[info.length];
-            for (int i = 0; i < info.length; i++) {
-                results[i] = new DfsPath(info[i]);
-            }
-            return results;
+      DFSFileInfo info[] = dfs.listPaths(getPath(f));
+      if (info == null) {
+        return new Path[0];
+      } else {
+        Path results[] = new DfsPath[info.length];
+        for (int i = 0; i < info.length; i++) {
+          results[i] = new DfsPath(info[i]);
         }
+        return results;
+      }
     }
 
     public boolean mkdirs(Path f) throws IOException {
-        return dfs.mkdirs(getPath(f));
+      return dfs.mkdirs(getPath(f));
     }
 
     /** @deprecated */ @Deprecated
-    public void lock(Path f, boolean shared) throws IOException {
-        dfs.lock(getPath(f), ! shared);
+      public void lock(Path f, boolean shared) throws IOException {
+      dfs.lock(getPath(f), ! shared);
     }
 
     /** @deprecated */ @Deprecated
-    public void release(Path f) throws IOException {
-        dfs.release(getPath(f));
+      public void release(Path f) throws IOException {
+      dfs.release(getPath(f));
     }
 
     @Override
-    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
-    throws IOException {
+      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+      throws IOException {
       FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
     }
 
     @Override
-    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
-    throws IOException {
+      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+      throws IOException {
       FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
     }
 
@@ -257,27 +257,27 @@
     }
 
     public void close() throws IOException {
-        super.close();
-        dfs.close();
+      super.close();
+      dfs.close();
     }
 
     public String toString() {
-        return "DFS[" + dfs + "]";
+      return "DFS[" + dfs + "]";
     }
 
     DFSClient getClient() {
-        return dfs;
+      return dfs;
     }        
     /** Return the total raw capacity of the filesystem, disregarding
      * replication .*/
     public long getRawCapacity() throws IOException{
-        return dfs.totalRawCapacity();
+      return dfs.totalRawCapacity();
     }
 
     /** Return the total raw used space in the filesystem, disregarding
      * replication .*/
     public long getRawUsed() throws IOException{
-        return dfs.totalRawUsed();
+      return dfs.totalRawUsed();
     }
 
     /** Return statistics for each datanode. */
@@ -291,7 +291,7 @@
      * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
      */
     public boolean setSafeMode( FSConstants.SafeModeAction action ) 
-    throws IOException {
+      throws IOException {
       return dfs.setSafeMode( action );
     }
 
@@ -325,8 +325,8 @@
      * we can consider figuring out exactly which block is corrupt.
      */
     public boolean reportChecksumFailure(Path f, 
-                                      FSDataInputStream in, long inPos, 
-                                      FSDataInputStream sums, long sumsPos) {
+                                         FSDataInputStream in, long inPos, 
+                                         FSDataInputStream sums, long sumsPos) {
       
       LocatedBlock lblocks[] = new LocatedBlock[2];
 
@@ -365,81 +365,81 @@
 
       return true;
     }
-    }
+  }
 
-    public DistributedFileSystem() {
-        super( new RawDistributedFileSystem() );
-    }
+  public DistributedFileSystem() {
+    super( new RawDistributedFileSystem() );
+  }
 
-    /** @deprecated */
-    public DistributedFileSystem(InetSocketAddress namenode,
-                                 Configuration conf) throws IOException {
-      super( new RawDistributedFileSystem(namenode, conf) );
-    }
+  /** @deprecated */
+  public DistributedFileSystem(InetSocketAddress namenode,
+                               Configuration conf) throws IOException {
+    super( new RawDistributedFileSystem(namenode, conf) );
+  }
 
-    @Override
+  @Override
     public long getContentLength(Path f) throws IOException {
-      return fs.getContentLength(f);
-    }
+    return fs.getContentLength(f);
+  }
 
-    /** Return the total raw capacity of the filesystem, disregarding
-     * replication .*/
-    public long getRawCapacity() throws IOException{
-        return ((RawDistributedFileSystem)fs).getRawCapacity();
-    }
+  /** Return the total raw capacity of the filesystem, disregarding
+   * replication .*/
+  public long getRawCapacity() throws IOException{
+    return ((RawDistributedFileSystem)fs).getRawCapacity();
+  }
 
-    /** Return the total raw used space in the filesystem, disregarding
-     * replication .*/
-    public long getRawUsed() throws IOException{
-        return ((RawDistributedFileSystem)fs).getRawUsed();
-    }
+  /** Return the total raw used space in the filesystem, disregarding
+   * replication .*/
+  public long getRawUsed() throws IOException{
+    return ((RawDistributedFileSystem)fs).getRawUsed();
+  }
 
-    /** Return statistics for each datanode. */
-    public DatanodeInfo[] getDataNodeStats() throws IOException {
-      return ((RawDistributedFileSystem)fs).getDataNodeStats();
-    }
+  /** Return statistics for each datanode. */
+  public DatanodeInfo[] getDataNodeStats() throws IOException {
+    return ((RawDistributedFileSystem)fs).getDataNodeStats();
+  }
     
-    /**
-     * Enter, leave or get safe mode.
-     *  
-     * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
-     */
-    public boolean setSafeMode( FSConstants.SafeModeAction action ) 
+  /**
+   * Enter, leave or get safe mode.
+   *  
+   * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+   */
+  public boolean setSafeMode( FSConstants.SafeModeAction action ) 
     throws IOException {
-      return ((RawDistributedFileSystem)fs).setSafeMode( action );
-    }
+    return ((RawDistributedFileSystem)fs).setSafeMode( action );
+  }
 
-    /*
-     * Refreshes the list of hosts and excluded hosts from the configured 
-     * files.  
-     */
-    public void refreshNodes() throws IOException {
-      ((RawDistributedFileSystem)fs).refreshNodes();
-    }
+  /*
+   * Refreshes the list of hosts and excluded hosts from the configured 
+   * files.  
+   */
+  public void refreshNodes() throws IOException {
+    ((RawDistributedFileSystem)fs).refreshNodes();
+  }
 
-    /**
-     * Finalize previously upgraded files system state.
-     */
-    public void finalizeUpgrade() throws IOException {
-      ((RawDistributedFileSystem)fs).finalizeUpgrade();
-    }
+  /**
+   * Finalize previously upgraded files system state.
+   */
+  public void finalizeUpgrade() throws IOException {
+    ((RawDistributedFileSystem)fs).finalizeUpgrade();
+  }
 
-    /*
-     * Dumps dfs data structures into specified file.
-     */
-     public void metaSave(String pathname) throws IOException {
-       ((RawDistributedFileSystem)fs).metaSave(pathname);
-     }
+  /*
+   * Dumps dfs data structures into specified file.
+   */
+  public void metaSave(String pathname) throws IOException {
+    ((RawDistributedFileSystem)fs).metaSave(pathname);
+  }
 
-    /**
-     * We need to find the blocks that didn't match.  Likely only one 
-     * is corrupt but we will report both to the namenode.  In the future,
-     * we can consider figuring out exactly which block is corrupt.
-     */
-    public boolean reportChecksumFailure(Path f, 
-                                      FSDataInputStream in, long inPos, 
-                                      FSDataInputStream sums, long sumsPos) {
-      return ((RawDistributedFileSystem)fs).reportChecksumFailure(
-                f, in, inPos, sums, sumsPos);
-    }
+  /**
+   * We need to find the blocks that didn't match.  Likely only one 
+   * is corrupt but we will report both to the namenode.  In the future,
+   * we can consider figuring out exactly which block is corrupt.
+   */
+  public boolean reportChecksumFailure(Path f, 
+                                       FSDataInputStream in, long inPos, 
+                                       FSDataInputStream sums, long sumsPos) {
+    return ((RawDistributedFileSystem)fs).reportChecksumFailure(
+                                                                f, in, inPos, sums, sumsPos);
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Apr 16 14:44:35 2007
@@ -25,120 +25,120 @@
  * @author Mike Cafarella
  ************************************/
 public interface FSConstants {
-    public static int MIN_BLOCKS_FOR_WRITE = 5;
+  public static int MIN_BLOCKS_FOR_WRITE = 5;
 
-    public static final long WRITE_COMPLETE = 0xcafae11a;
+  public static final long WRITE_COMPLETE = 0xcafae11a;
 
-    //
-    // IPC Opcodes 
-    //
-    // Processed at namenode
-    public static final byte OP_ERROR = (byte) 0;
-    public static final byte OP_HEARTBEAT = (byte) 1;
-    public static final byte OP_BLOCKRECEIVED = (byte) 2;
-    public static final byte OP_BLOCKREPORT = (byte) 3;
-    public static final byte OP_TRANSFERDATA = (byte) 4;
-
-    // Processed at namenode, from client
-    public static final byte OP_CLIENT_OPEN = (byte) 20;
-    public static final byte OP_CLIENT_STARTFILE = (byte) 21;
-    public static final byte OP_CLIENT_ADDBLOCK = (byte) 22;
-    public static final byte OP_CLIENT_RENAMETO = (byte) 23;
-    public static final byte OP_CLIENT_DELETE = (byte) 24;  
-    public static final byte OP_CLIENT_COMPLETEFILE = (byte) 25;
-    public static final byte OP_CLIENT_LISTING = (byte) 26;
-    public static final byte OP_CLIENT_OBTAINLOCK = (byte) 27;
-    public static final byte OP_CLIENT_RELEASELOCK = (byte) 28;
-    public static final byte OP_CLIENT_EXISTS = (byte) 29;
-    public static final byte OP_CLIENT_ISDIR = (byte) 30;
-    public static final byte OP_CLIENT_MKDIRS = (byte) 31;
-    public static final byte OP_CLIENT_RENEW_LEASE = (byte) 32;
-    public static final byte OP_CLIENT_ABANDONBLOCK = (byte) 33;
-    public static final byte OP_CLIENT_RAWSTATS = (byte) 34;
-    public static final byte OP_CLIENT_DATANODEREPORT = (byte) 35;
-    public static final byte OP_CLIENT_DATANODE_HINTS = (byte) 36;
+  //
+  // IPC Opcodes 
+  //
+  // Processed at namenode
+  public static final byte OP_ERROR = (byte) 0;
+  public static final byte OP_HEARTBEAT = (byte) 1;
+  public static final byte OP_BLOCKRECEIVED = (byte) 2;
+  public static final byte OP_BLOCKREPORT = (byte) 3;
+  public static final byte OP_TRANSFERDATA = (byte) 4;
+
+  // Processed at namenode, from client
+  public static final byte OP_CLIENT_OPEN = (byte) 20;
+  public static final byte OP_CLIENT_STARTFILE = (byte) 21;
+  public static final byte OP_CLIENT_ADDBLOCK = (byte) 22;
+  public static final byte OP_CLIENT_RENAMETO = (byte) 23;
+  public static final byte OP_CLIENT_DELETE = (byte) 24;  
+  public static final byte OP_CLIENT_COMPLETEFILE = (byte) 25;
+  public static final byte OP_CLIENT_LISTING = (byte) 26;
+  public static final byte OP_CLIENT_OBTAINLOCK = (byte) 27;
+  public static final byte OP_CLIENT_RELEASELOCK = (byte) 28;
+  public static final byte OP_CLIENT_EXISTS = (byte) 29;
+  public static final byte OP_CLIENT_ISDIR = (byte) 30;
+  public static final byte OP_CLIENT_MKDIRS = (byte) 31;
+  public static final byte OP_CLIENT_RENEW_LEASE = (byte) 32;
+  public static final byte OP_CLIENT_ABANDONBLOCK = (byte) 33;
+  public static final byte OP_CLIENT_RAWSTATS = (byte) 34;
+  public static final byte OP_CLIENT_DATANODEREPORT = (byte) 35;
+  public static final byte OP_CLIENT_DATANODE_HINTS = (byte) 36;
     
-    // Processed at datanode, back from namenode
-    public static final byte OP_ACK = (byte) 40;
-    public static final byte OP_TRANSFERBLOCKS = (byte) 41;    
-    public static final byte OP_INVALIDATE_BLOCKS = (byte) 42;
-    public static final byte OP_FAILURE = (byte) 43;
-
-    // Processed at client, back from namenode
-    public static final byte OP_CLIENT_OPEN_ACK = (byte) 60;
-    public static final byte OP_CLIENT_STARTFILE_ACK = (byte) 61;
-    public static final byte OP_CLIENT_ADDBLOCK_ACK = (byte) 62;
-    public static final byte OP_CLIENT_RENAMETO_ACK = (byte) 63;
-    public static final byte OP_CLIENT_DELETE_ACK = (byte) 64;
-    public static final byte OP_CLIENT_COMPLETEFILE_ACK = (byte) 65;
-    public static final byte OP_CLIENT_TRYAGAIN = (byte) 66;
-    public static final byte OP_CLIENT_LISTING_ACK = (byte) 67;
-    public static final byte OP_CLIENT_OBTAINLOCK_ACK = (byte) 68;
-    public static final byte OP_CLIENT_RELEASELOCK_ACK = (byte) 69;
-    public static final byte OP_CLIENT_EXISTS_ACK = (byte) 70;  
-    public static final byte OP_CLIENT_ISDIR_ACK = (byte) 71;
-    public static final byte OP_CLIENT_MKDIRS_ACK = (byte) 72;
-    public static final byte OP_CLIENT_RENEW_LEASE_ACK = (byte) 73;    
-    public static final byte OP_CLIENT_ABANDONBLOCK_ACK = (byte) 74;
-    public static final byte OP_CLIENT_RAWSTATS_ACK = (byte) 75;
-    public static final byte OP_CLIENT_DATANODEREPORT_ACK = (byte) 76;
-    public static final byte OP_CLIENT_DATANODE_HINTS_ACK = (byte) 77;
-
-    // Processed at datanode stream-handler
-    public static final byte OP_WRITE_BLOCK = (byte) 80;
-    public static final byte OP_READ_BLOCK = (byte) 81;
-    public static final byte OP_READSKIP_BLOCK = (byte) 82;
-    public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
-
-    // Encoding types
-    public static final byte RUNLENGTH_ENCODING = 0;
-    public static final byte CHUNKED_ENCODING = 1;
-
-    // Return codes for file create
-    public static final int OPERATION_FAILED = 0;
-    public static final int STILL_WAITING = 1;
-    public static final int COMPLETE_SUCCESS = 2;
-
-    // Chunk the block Invalidate message
-    public static final int BLOCK_INVALIDATE_CHUNK = 100;
-
-    //
-    // Timeouts, constants
-    //
-    public static long HEARTBEAT_INTERVAL = 3;
-    public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
-    public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
-    public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
-    public static int READ_TIMEOUT = 60 * 1000;
-
-    // We need to limit the length and depth of a path in the filesystem.  HADOOP-438
-    // Currently we set the maximum length to 8k characters and the maximum depth to 1k.  
-    public static int MAX_PATH_LENGTH = 8000;
-    public static int MAX_PATH_DEPTH = 1000;
+  // Processed at datanode, back from namenode
+  public static final byte OP_ACK = (byte) 40;
+  public static final byte OP_TRANSFERBLOCKS = (byte) 41;    
+  public static final byte OP_INVALIDATE_BLOCKS = (byte) 42;
+  public static final byte OP_FAILURE = (byte) 43;
+
+  // Processed at client, back from namenode
+  public static final byte OP_CLIENT_OPEN_ACK = (byte) 60;
+  public static final byte OP_CLIENT_STARTFILE_ACK = (byte) 61;
+  public static final byte OP_CLIENT_ADDBLOCK_ACK = (byte) 62;
+  public static final byte OP_CLIENT_RENAMETO_ACK = (byte) 63;
+  public static final byte OP_CLIENT_DELETE_ACK = (byte) 64;
+  public static final byte OP_CLIENT_COMPLETEFILE_ACK = (byte) 65;
+  public static final byte OP_CLIENT_TRYAGAIN = (byte) 66;
+  public static final byte OP_CLIENT_LISTING_ACK = (byte) 67;
+  public static final byte OP_CLIENT_OBTAINLOCK_ACK = (byte) 68;
+  public static final byte OP_CLIENT_RELEASELOCK_ACK = (byte) 69;
+  public static final byte OP_CLIENT_EXISTS_ACK = (byte) 70;  
+  public static final byte OP_CLIENT_ISDIR_ACK = (byte) 71;
+  public static final byte OP_CLIENT_MKDIRS_ACK = (byte) 72;
+  public static final byte OP_CLIENT_RENEW_LEASE_ACK = (byte) 73;    
+  public static final byte OP_CLIENT_ABANDONBLOCK_ACK = (byte) 74;
+  public static final byte OP_CLIENT_RAWSTATS_ACK = (byte) 75;
+  public static final byte OP_CLIENT_DATANODEREPORT_ACK = (byte) 76;
+  public static final byte OP_CLIENT_DATANODE_HINTS_ACK = (byte) 77;
+
+  // Processed at datanode stream-handler
+  public static final byte OP_WRITE_BLOCK = (byte) 80;
+  public static final byte OP_READ_BLOCK = (byte) 81;
+  public static final byte OP_READSKIP_BLOCK = (byte) 82;
+  public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
+
+  // Encoding types
+  public static final byte RUNLENGTH_ENCODING = 0;
+  public static final byte CHUNKED_ENCODING = 1;
+
+  // Return codes for file create
+  public static final int OPERATION_FAILED = 0;
+  public static final int STILL_WAITING = 1;
+  public static final int COMPLETE_SUCCESS = 2;
+
+  // Chunk the block Invalidate message
+  public static final int BLOCK_INVALIDATE_CHUNK = 100;
+
+  //
+  // Timeouts, constants
+  //
+  public static long HEARTBEAT_INTERVAL = 3;
+  public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
+  public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
+  public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
+  public static int READ_TIMEOUT = 60 * 1000;
+
+  // We need to limit the length and depth of a path in the filesystem.  HADOOP-438
+  // Currently we set the maximum length to 8k characters and the maximum depth to 1k.  
+  public static int MAX_PATH_LENGTH = 8000;
+  public static int MAX_PATH_DEPTH = 1000;
     
-    //TODO mb@media-style.com: should be conf injected?
-    public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+  //TODO mb@media-style.com: should be conf injected?
+  public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
 
-    // SafeMode actions
-    public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+  // SafeMode actions
+  public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
 
-    // Startup options
-    public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK; }
+  // Startup options
+  public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK; }
 
-    /**
-     * Type of the node
-     */
-    static public enum NodeType {
-      NAME_NODE,
-      DATA_NODE;
-    }
-
-    // Version is reflected in the dfs image and edit log files.
-    // Version is reflected in the data storage file.
-    // Versions are negative.
-    // Decrement LAYOUT_VERSION to define a new version.
-    public static final int LAYOUT_VERSION = -4;
-    // Current version: 
-    // Top level directory is reorganized to allow file system state 
-    // transitions: upgrade, rollback, and finalize.
+  /**
+   * Type of the node
+   */
+  static public enum NodeType {
+    NAME_NODE,
+    DATA_NODE;
+  }
+
+  // Version is reflected in the dfs image and edit log files.
+  // Version is reflected in the data storage file.
+  // Versions are negative.
+  // Decrement LAYOUT_VERSION to define a new version.
+  public static final int LAYOUT_VERSION = -4;
+  // Current version: 
+  // Top level directory is reorganized to allow file system state 
+  // transitions: upgrade, rollback, and finalize.
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon Apr 16 14:44:35 2007
@@ -36,609 +36,609 @@
 
 
   /**
-     * A node type that can be built into a tree reflecting the
-     * hierarchy of blocks on the local disk.
-     */
-    class FSDir {
-        File dir;
-        int numBlocks = 0;
-        FSDir children[];
-        int lastChildIdx = 0;
-        /**
-         */
-        public FSDir(File dir) 
-            throws IOException {
-            this.dir = dir;
-            this.children = null;
-            if (! dir.exists()) {
-              if (! dir.mkdirs()) {
-                throw new IOException("Mkdirs failed to create " + 
-                                      dir.toString());
-              }
-            } else {
-              File[] files = dir.listFiles();
-              int numChildren = 0;
-              for (int idx = 0; idx < files.length; idx++) {
-                if (files[idx].isDirectory()) {
-                  numChildren++;
-                } else if (Block.isBlockFilename(files[idx])) {
-                  numBlocks++;
-                }
-              }
-              if (numChildren > 0) {
-                children = new FSDir[numChildren];
-                int curdir = 0;
-                for (int idx = 0; idx < files.length; idx++) {
-                  if (files[idx].isDirectory()) {
-                    children[curdir] = new FSDir(files[idx]);
-                    curdir++;
-                  }
-                }
-              }
+   * A node type that can be built into a tree reflecting the
+   * hierarchy of blocks on the local disk.
+   */
+  class FSDir {
+    File dir;
+    int numBlocks = 0;
+    FSDir children[];
+    int lastChildIdx = 0;
+    /**
+     */
+    public FSDir(File dir) 
+      throws IOException {
+      this.dir = dir;
+      this.children = null;
+      if (! dir.exists()) {
+        if (! dir.mkdirs()) {
+          throw new IOException("Mkdirs failed to create " + 
+                                dir.toString());
+        }
+      } else {
+        File[] files = dir.listFiles();
+        int numChildren = 0;
+        for (int idx = 0; idx < files.length; idx++) {
+          if (files[idx].isDirectory()) {
+            numChildren++;
+          } else if (Block.isBlockFilename(files[idx])) {
+            numBlocks++;
+          }
+        }
+        if (numChildren > 0) {
+          children = new FSDir[numChildren];
+          int curdir = 0;
+          for (int idx = 0; idx < files.length; idx++) {
+            if (files[idx].isDirectory()) {
+              children[curdir] = new FSDir(files[idx]);
+              curdir++;
             }
+          }
         }
+      }
+    }
         
-        public File addBlock( Block b, File src ) throws IOException {
-          //First try without creating subdirectories
-          File file = addBlock( b, src, false, false );          
-          return ( file != null ) ? file : addBlock( b, src, true, true );
-        }
+    public File addBlock( Block b, File src ) throws IOException {
+      //First try without creating subdirectories
+      File file = addBlock( b, src, false, false );          
+      return ( file != null ) ? file : addBlock( b, src, true, true );
+    }
 
-        private File addBlock( Block b, File src, boolean createOk, 
-                               boolean resetIdx ) throws IOException {
-            if (numBlocks < maxBlocksPerDir) {
-              File dest = new File(dir, b.getBlockName());
-              src.renameTo(dest);
-              numBlocks += 1;
-              return dest;
-            }
-            
-            if ( lastChildIdx < 0 && resetIdx ) {
-              //reset so that all children will be checked
-              lastChildIdx = random.nextInt( children.length );              
-            }
+    private File addBlock( Block b, File src, boolean createOk, 
+                           boolean resetIdx ) throws IOException {
+      if (numBlocks < maxBlocksPerDir) {
+        File dest = new File(dir, b.getBlockName());
+        src.renameTo(dest);
+        numBlocks += 1;
+        return dest;
+      }
             
-            if ( lastChildIdx >= 0 && children != null ) {
-              //Check if any child-tree has room for a block.
-              for (int i=0; i < children.length; i++) {
-                int idx = ( lastChildIdx + i )%children.length;
-                File file = children[idx].addBlock( b, src, false, resetIdx );
-                if ( file != null ) {
-                  lastChildIdx = idx;
-                  return file; 
-                }
-              }
-              lastChildIdx = -1;
-            }
+      if ( lastChildIdx < 0 && resetIdx ) {
+        //reset so that all children will be checked
+        lastChildIdx = random.nextInt( children.length );              
+      }
             
-            if ( !createOk ) {
-              return null;
-            }
+      if ( lastChildIdx >= 0 && children != null ) {
+        //Check if any child-tree has room for a block.
+        for (int i=0; i < children.length; i++) {
+          int idx = ( lastChildIdx + i )%children.length;
+          File file = children[idx].addBlock( b, src, false, resetIdx );
+          if ( file != null ) {
+            lastChildIdx = idx;
+            return file; 
+          }
+        }
+        lastChildIdx = -1;
+      }
             
-            if ( children == null || children.length == 0 ) {
-              children = new FSDir[maxBlocksPerDir];
-              for (int idx = 0; idx < maxBlocksPerDir; idx++) {
-                children[idx] = new FSDir( new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx) );
-              }
-            }
+      if ( !createOk ) {
+        return null;
+      }
             
-            //now pick a child randomly for creating a new set of subdirs.
-            lastChildIdx = random.nextInt( children.length );
-            return children[ lastChildIdx ].addBlock( b, src, true, false ); 
+      if ( children == null || children.length == 0 ) {
+        children = new FSDir[maxBlocksPerDir];
+        for (int idx = 0; idx < maxBlocksPerDir; idx++) {
+          children[idx] = new FSDir( new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx) );
         }
+      }
+            
+      //now pick a child randomly for creating a new set of subdirs.
+      lastChildIdx = random.nextInt( children.length );
+      return children[ lastChildIdx ].addBlock( b, src, true, false ); 
+    }
 
-        /**
-         * Populate the given blockSet with any child blocks
-         * found at this node.
-         */
-        public void getBlockInfo(TreeSet<Block> blockSet) {
-            if (children != null) {
-                for (int i = 0; i < children.length; i++) {
-                    children[i].getBlockInfo(blockSet);
-                }
-            }
+    /**
+     * Populate the given blockSet with any child blocks
+     * found at this node.
+     */
+    public void getBlockInfo(TreeSet<Block> blockSet) {
+      if (children != null) {
+        for (int i = 0; i < children.length; i++) {
+          children[i].getBlockInfo(blockSet);
+        }
+      }
 
-            File blockFiles[] = dir.listFiles();
-            for (int i = 0; i < blockFiles.length; i++) {
-                if (Block.isBlockFilename(blockFiles[i])) {
-                    blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
-                }
-            }
+      File blockFiles[] = dir.listFiles();
+      for (int i = 0; i < blockFiles.length; i++) {
+        if (Block.isBlockFilename(blockFiles[i])) {
+          blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
         }
+      }
+    }
 
 
-        void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
-          if (children != null) {
-                for (int i = 0; i < children.length; i++) {
-                    children[i].getVolumeMap(volumeMap, volume);
-                }
-            }
+    void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
+      if (children != null) {
+        for (int i = 0; i < children.length; i++) {
+          children[i].getVolumeMap(volumeMap, volume);
+        }
+      }
 
-            File blockFiles[] = dir.listFiles();
-            for (int i = 0; i < blockFiles.length; i++) {
-                if (Block.isBlockFilename(blockFiles[i])) {
-                    volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
-                }
-            }
+      File blockFiles[] = dir.listFiles();
+      for (int i = 0; i < blockFiles.length; i++) {
+        if (Block.isBlockFilename(blockFiles[i])) {
+          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
         }
+      }
+    }
         
-        void getBlockMap(HashMap<Block, File> blockMap) {
-          if (children != null) {
-                for (int i = 0; i < children.length; i++) {
-                    children[i].getBlockMap(blockMap);
-                }
-            }
+    void getBlockMap(HashMap<Block, File> blockMap) {
+      if (children != null) {
+        for (int i = 0; i < children.length; i++) {
+          children[i].getBlockMap(blockMap);
+        }
+      }
 
-            File blockFiles[] = dir.listFiles();
-            for (int i = 0; i < blockFiles.length; i++) {
-                if (Block.isBlockFilename(blockFiles[i])) {
-                    blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
-                }
-            }
+      File blockFiles[] = dir.listFiles();
+      for (int i = 0; i < blockFiles.length; i++) {
+        if (Block.isBlockFilename(blockFiles[i])) {
+          blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
         }
-        /**
-         * check if a data diretory is healthy
-         * @throws DiskErrorException
-         * @author hairong
-         */
-        public void checkDirTree() throws DiskErrorException {
-            DiskChecker.checkDir(dir);
+      }
+    }
+    /**
+     * check if a data diretory is healthy
+     * @throws DiskErrorException
+     * @author hairong
+     */
+    public void checkDirTree() throws DiskErrorException {
+      DiskChecker.checkDir(dir);
             
-            if (children != null) {
-                for (int i = 0; i < children.length; i++) {
-                    children[i].checkDirTree();
-                }
-            }
+      if (children != null) {
+        for (int i = 0; i < children.length; i++) {
+          children[i].checkDirTree();
         }
+      }
+    }
         
-        void clearPath(File f) {
-          String root = dir.getAbsolutePath();
-          String dir = f.getAbsolutePath();
-          if ( dir.startsWith( root ) ) {
-            String[] dirNames = dir.substring( root.length() ).
-                         split( File.separator + "subdir" );
-            if ( clearPath( f, dirNames, 1 ) )
-              return;
-          }
-          clearPath( f, null, -1 );
-        }
+    void clearPath(File f) {
+      String root = dir.getAbsolutePath();
+      String dir = f.getAbsolutePath();
+      if ( dir.startsWith( root ) ) {
+        String[] dirNames = dir.substring( root.length() ).
+          split( File.separator + "subdir" );
+        if ( clearPath( f, dirNames, 1 ) )
+          return;
+      }
+      clearPath( f, null, -1 );
+    }
         
-        /*
-         * dirNames is an array of string integers derived from
-         * usual directory structure data/subdirN/subdirXY/subdirM ...
-         * If dirName array is non-null, we only check the child at 
-         * the children[dirNames[idx]]. This avoids iterating over
-         * children in common case. If directory structure changes 
-         * in later versions, we need to revisit this.
-         */
-        private boolean clearPath( File f, String[] dirNames, int idx ) {
-          if ( ( dirNames == null || idx == dirNames.length ) &&
-               dir.compareTo(f) == 0) {
-            numBlocks--;
-            return true;
-          }
+    /*
+     * dirNames is an array of string integers derived from
+     * usual directory structure data/subdirN/subdirXY/subdirM ...
+     * If dirName array is non-null, we only check the child at 
+     * the children[dirNames[idx]]. This avoids iterating over
+     * children in common case. If directory structure changes 
+     * in later versions, we need to revisit this.
+     */
+    private boolean clearPath( File f, String[] dirNames, int idx ) {
+      if ( ( dirNames == null || idx == dirNames.length ) &&
+           dir.compareTo(f) == 0) {
+        numBlocks--;
+        return true;
+      }
           
-          if ( dirNames != null ) {
-            //guess the child index from the directory name
-            if ( idx > ( dirNames.length - 1 ) || children == null ) {
-              return false;
-            }
-            int childIdx; 
-            try {
-              childIdx = Integer.parseInt( dirNames[idx] );
-            } catch ( NumberFormatException ignored ) {
-              // layout changed? we could print a warning.
-              return false;
-            }
-            return ( childIdx >= 0 && childIdx < children.length ) ?
-              children[childIdx].clearPath( f, dirNames, idx+1 ) : false;
-          }
-
-          //guesses failed. back to blind iteration.
-          if ( children != null ) {
-            for(int i=0; i < children.length; i++) {
-              if ( children[i].clearPath( f, null, -1 ) ){
-                return true;
-              }
-            }
-          }
+      if ( dirNames != null ) {
+        //guess the child index from the directory name
+        if ( idx > ( dirNames.length - 1 ) || children == null ) {
           return false;
         }
-        
-        public String toString() {
-          return "FSDir{" +
-              "dir=" + dir +
-              ", children=" + (children == null ? null : Arrays.asList(children)) +
-              "}";
+        int childIdx; 
+        try {
+          childIdx = Integer.parseInt( dirNames[idx] );
+        } catch ( NumberFormatException ignored ) {
+          // layout changed? we could print a warning.
+          return false;
         }
-    }
+        return ( childIdx >= 0 && childIdx < children.length ) ?
+          children[childIdx].clearPath( f, dirNames, idx+1 ) : false;
+      }
 
-    class FSVolume {
-      static final double USABLE_DISK_PCT_DEFAULT = 0.98f; 
-
-      private FSDir dataDir;
-      private File tmpDir;
-      private DF usage;
-      private long reserved;
-      private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
-    
-      FSVolume( File currentDir, Configuration conf) throws IOException {
-        this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
-        this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
-            (float) USABLE_DISK_PCT_DEFAULT);
-        File parent = currentDir.getParentFile();
-        this.dataDir = new FSDir( currentDir );
-        this.tmpDir = new File(parent, "tmp");
-        if (tmpDir.exists()) {
-          FileUtil.fullyDelete(tmpDir);
-        }
-        if (!tmpDir.mkdirs()) {
-          if (!tmpDir.isDirectory()) {
-            throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      //guesses failed. back to blind iteration.
+      if ( children != null ) {
+        for(int i=0; i < children.length; i++) {
+          if ( children[i].clearPath( f, null, -1 ) ){
+            return true;
           }
         }
-        this.usage = new DF(parent, conf);
       }
-      
-      long getCapacity() throws IOException {
-        return usage.getCapacity();
+      return false;
+    }
+        
+    public String toString() {
+      return "FSDir{" +
+        "dir=" + dir +
+        ", children=" + (children == null ? null : Arrays.asList(children)) +
+        "}";
+    }
+  }
+
+  class FSVolume {
+    static final double USABLE_DISK_PCT_DEFAULT = 0.98f; 
+
+    private FSDir dataDir;
+    private File tmpDir;
+    private DF usage;
+    private long reserved;
+    private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
+    
+    FSVolume( File currentDir, Configuration conf) throws IOException {
+      this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+      this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
+                                         (float) USABLE_DISK_PCT_DEFAULT);
+      File parent = currentDir.getParentFile();
+      this.dataDir = new FSDir( currentDir );
+      this.tmpDir = new File(parent, "tmp");
+      if (tmpDir.exists()) {
+        FileUtil.fullyDelete(tmpDir);
       }
-      
-      long getAvailable() throws IOException {
-        long capacity = usage.getCapacity();
-        long freespace = Math.round(usage.getAvailableSkipRefresh() -
-                                    capacity * (1 - usableDiskPct) - reserved); 
-        return ( freespace > 0 ) ? freespace : 0;
+      if (!tmpDir.mkdirs()) {
+        if (!tmpDir.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+        }
       }
+      this.usage = new DF(parent, conf);
+    }
       
-      String getMount() throws IOException {
-        return usage.getMount();
-      }
+    long getCapacity() throws IOException {
+      return usage.getCapacity();
+    }
       
-      File createTmpFile(Block b) throws IOException {
-        File f = new File(tmpDir, b.getBlockName());
-        try {
-          if (f.exists()) {
-            throw new IOException("Unexpected problem in creating temporary file for "+
-                b + ".  File " + f + " should not be present, but is.");
-          }
-          // Create the zero-length temp file
-          //
-          if (!f.createNewFile()) {
-            throw new IOException("Unexpected problem in creating temporary file for "+
-                b + ".  File " + f + " should be creatable, but is already present.");
-          }
-        } catch (IOException ie) {
-          System.out.println("Exception!  " + ie);
-          throw ie;
+    long getAvailable() throws IOException {
+      long capacity = usage.getCapacity();
+      long freespace = Math.round(usage.getAvailableSkipRefresh() -
+                                  capacity * (1 - usableDiskPct) - reserved); 
+      return ( freespace > 0 ) ? freespace : 0;
+    }
+      
+    String getMount() throws IOException {
+      return usage.getMount();
+    }
+      
+    File createTmpFile(Block b) throws IOException {
+      File f = new File(tmpDir, b.getBlockName());
+      try {
+        if (f.exists()) {
+          throw new IOException("Unexpected problem in creating temporary file for "+
+                                b + ".  File " + f + " should not be present, but is.");
         }
-        return f;
+        // Create the zero-length temp file
+        //
+        if (!f.createNewFile()) {
+          throw new IOException("Unexpected problem in creating temporary file for "+
+                                b + ".  File " + f + " should be creatable, but is already present.");
+        }
+      } catch (IOException ie) {
+        System.out.println("Exception!  " + ie);
+        throw ie;
       }
+      return f;
+    }
       
-      File addBlock(Block b, File f) throws IOException {
-        return dataDir.addBlock(b, f);
-      }
+    File addBlock(Block b, File f) throws IOException {
+      return dataDir.addBlock(b, f);
+    }
       
-      void checkDirs() throws DiskErrorException {
-        dataDir.checkDirTree();
-        DiskChecker.checkDir(tmpDir);
-      }
+    void checkDirs() throws DiskErrorException {
+      dataDir.checkDirTree();
+      DiskChecker.checkDir(tmpDir);
+    }
       
-      void getBlockInfo(TreeSet<Block> blockSet) {
-        dataDir.getBlockInfo(blockSet);
-      }
+    void getBlockInfo(TreeSet<Block> blockSet) {
+      dataDir.getBlockInfo(blockSet);
+    }
       
-      void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
-        dataDir.getVolumeMap(volumeMap, this);
-      }
+    void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+      dataDir.getVolumeMap(volumeMap, this);
+    }
       
-      void getBlockMap(HashMap<Block, File> blockMap) {
-        dataDir.getBlockMap(blockMap);
-      }
+    void getBlockMap(HashMap<Block, File> blockMap) {
+      dataDir.getBlockMap(blockMap);
+    }
       
-      void clearPath(File f) {
-        dataDir.clearPath(f);
-      }
+    void clearPath(File f) {
+      dataDir.clearPath(f);
+    }
       
-      public String toString() {
-        return dataDir.dir.getAbsolutePath();
-      }
+    public String toString() {
+      return dataDir.dir.getAbsolutePath();
     }
+  }
     
-    class FSVolumeSet {
-      FSVolume[] volumes = null;
-      int curVolume = 0;
-      
-      FSVolumeSet(FSVolume[] volumes) {
-        this.volumes = volumes;
-      }
-      
-      synchronized FSVolume getNextVolume(long blockSize) throws IOException {
-        int startVolume = curVolume;
-        while (true) {
-          FSVolume volume = volumes[curVolume];
-          curVolume = (curVolume + 1) % volumes.length;
-          if (volume.getAvailable() >= blockSize) { return volume; }
-          if (curVolume == startVolume) {
-            throw new DiskOutOfSpaceException("Insufficient space for an additional block");
-          }
-        }
-      }
+  class FSVolumeSet {
+    FSVolume[] volumes = null;
+    int curVolume = 0;
+      
+    FSVolumeSet(FSVolume[] volumes) {
+      this.volumes = volumes;
+    }
       
-      synchronized long getCapacity() throws IOException {
-        long capacity = 0L;
-        for (int idx = 0; idx < volumes.length; idx++) {
-            capacity += volumes[idx].getCapacity();
+    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
+      int startVolume = curVolume;
+      while (true) {
+        FSVolume volume = volumes[curVolume];
+        curVolume = (curVolume + 1) % volumes.length;
+        if (volume.getAvailable() >= blockSize) { return volume; }
+        if (curVolume == startVolume) {
+          throw new DiskOutOfSpaceException("Insufficient space for an additional block");
         }
-        return capacity;
       }
+    }
       
-      synchronized long getRemaining() throws IOException {
-        long remaining = 0L;
-        for (int idx = 0; idx < volumes.length; idx++) {
-          remaining += volumes[idx].getAvailable();
-        }
-        return remaining;
+    synchronized long getCapacity() throws IOException {
+      long capacity = 0L;
+      for (int idx = 0; idx < volumes.length; idx++) {
+        capacity += volumes[idx].getCapacity();
       }
+      return capacity;
+    }
       
-      synchronized void getBlockInfo(TreeSet<Block> blockSet) {
-        for (int idx = 0; idx < volumes.length; idx++) {
-          volumes[idx].getBlockInfo(blockSet);
-        }
+    synchronized long getRemaining() throws IOException {
+      long remaining = 0L;
+      for (int idx = 0; idx < volumes.length; idx++) {
+        remaining += volumes[idx].getAvailable();
       }
+      return remaining;
+    }
       
-      synchronized void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
-        for (int idx = 0; idx < volumes.length; idx++) {
-          volumes[idx].getVolumeMap(volumeMap);
-        }
+    synchronized void getBlockInfo(TreeSet<Block> blockSet) {
+      for (int idx = 0; idx < volumes.length; idx++) {
+        volumes[idx].getBlockInfo(blockSet);
       }
+    }
       
-      synchronized void getBlockMap(HashMap<Block, File> blockMap) {
-        for (int idx = 0; idx < volumes.length; idx++) {
-          volumes[idx].getBlockMap(blockMap);
-        }
+    synchronized void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+      for (int idx = 0; idx < volumes.length; idx++) {
+        volumes[idx].getVolumeMap(volumeMap);
       }
+    }
       
-      synchronized void checkDirs() throws DiskErrorException {
-        for (int idx = 0; idx < volumes.length; idx++) {
-          volumes[idx].checkDirs();
-        }
+    synchronized void getBlockMap(HashMap<Block, File> blockMap) {
+      for (int idx = 0; idx < volumes.length; idx++) {
+        volumes[idx].getBlockMap(blockMap);
       }
+    }
       
-      public String toString() {
-        StringBuffer sb = new StringBuffer();
-        for (int idx = 0; idx < volumes.length; idx++) {
-          sb.append(volumes[idx].toString());
-          if (idx != volumes.length - 1) { sb.append(","); }
-        }
-        return sb.toString();
+    synchronized void checkDirs() throws DiskErrorException {
+      for (int idx = 0; idx < volumes.length; idx++) {
+        volumes[idx].checkDirs();
       }
     }
-    //////////////////////////////////////////////////////
+      
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      for (int idx = 0; idx < volumes.length; idx++) {
+        sb.append(volumes[idx].toString());
+        if (idx != volumes.length - 1) { sb.append(","); }
+      }
+      return sb.toString();
+    }
+  }
+  //////////////////////////////////////////////////////
+  //
+  // FSDataSet
+  //
+  //////////////////////////////////////////////////////
+
+  FSVolumeSet volumes;
+  private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
+  private int maxBlocksPerDir = 0;
+  private HashMap<Block,FSVolume> volumeMap = null;
+  private HashMap<Block,File> blockMap = null;
+  static  Random random = new Random();
+
+  /**
+   * An FSDataset has a directory where it loads its data files.
+   */
+  public FSDataset( DataStorage storage, Configuration conf) throws IOException {
+    this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+    FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
+    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+      volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
+    }
+    volumes = new FSVolumeSet(volArray);
+    volumeMap = new HashMap<Block,FSVolume>();
+    volumes.getVolumeMap(volumeMap);
+    blockMap = new HashMap<Block,File>();
+    volumes.getBlockMap(blockMap);
+  }
+
+  /**
+   * Return total capacity, used and unused
+   */
+  public long getCapacity() throws IOException {
+    return volumes.getCapacity();
+  }
+
+  /**
+   * Return how many bytes can still be stored in the FSDataset
+   */
+  public long getRemaining() throws IOException {
+    return volumes.getRemaining();
+  }
+
+  /**
+   * Find the block's on-disk length
+   */
+  public long getLength(Block b) throws IOException {
+    if (! isValidBlock(b)) {
+      throw new IOException("Block " + b + " is not valid.");
+    }
+    File f = getFile(b);
+    return f.length();
+  }
+
+  /**
+   * Get a stream of data from the indicated block.
+   */
+  public synchronized InputStream getBlockData(Block b) throws IOException {
+    if (! isValidBlock(b)) {
+      throw new IOException("Block " + b + " is not valid.");
+    }
+    // File should be opened with the lock.
+    return new FileInputStream(getFile(b));
+  }
+
+  /**
+   * Start writing to a block file
+   */
+  public OutputStream writeToBlock(Block b) throws IOException {
     //
-    // FSDataSet
+    // Make sure the block isn't a valid one - we're still creating it!
     //
-    //////////////////////////////////////////////////////
-
-    FSVolumeSet volumes;
-    private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
-    private int maxBlocksPerDir = 0;
-    private HashMap<Block,FSVolume> volumeMap = null;
-    private HashMap<Block,File> blockMap = null;
-    static  Random random = new Random();
-
-    /**
-     * An FSDataset has a directory where it loads its data files.
-     */
-    public FSDataset( DataStorage storage, Configuration conf) throws IOException {
-    	this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
-        FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
-        for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-          volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
-        }
-        volumes = new FSVolumeSet(volArray);
-        volumeMap = new HashMap<Block,FSVolume>();
-        volumes.getVolumeMap(volumeMap);
-        blockMap = new HashMap<Block,File>();
-        volumes.getBlockMap(blockMap);
-    }
-
-    /**
-     * Return total capacity, used and unused
-     */
-    public long getCapacity() throws IOException {
-        return volumes.getCapacity();
-    }
-
-    /**
-     * Return how many bytes can still be stored in the FSDataset
-     */
-    public long getRemaining() throws IOException {
-        return volumes.getRemaining();
-    }
-
-    /**
-     * Find the block's on-disk length
-     */
-    public long getLength(Block b) throws IOException {
-        if (! isValidBlock(b)) {
-            throw new IOException("Block " + b + " is not valid.");
-        }
-        File f = getFile(b);
-        return f.length();
-    }
-
-    /**
-     * Get a stream of data from the indicated block.
-     */
-    public synchronized InputStream getBlockData(Block b) throws IOException {
-        if (! isValidBlock(b)) {
-            throw new IOException("Block " + b + " is not valid.");
-        }
-        // File should be opened with the lock.
-        return new FileInputStream(getFile(b));
-    }
-
-    /**
-     * Start writing to a block file
-     */
-    public OutputStream writeToBlock(Block b) throws IOException {
-        //
-        // Make sure the block isn't a valid one - we're still creating it!
-        //
-        if (isValidBlock(b)) {
-            throw new IOException("Block " + b + " is valid, and cannot be written to.");
-        }
-        long blockSize = b.getNumBytes();
-
-        //
-        // Serialize access to /tmp, and check if file already there.
-        //
-        File f = null;
-        synchronized ( this ) {
-            //
-            // Is it already in the create process?
-            //
-            if (ongoingCreates.containsKey(b)) {
-              // check how old is the temp file - wait 1 hour
-              File tmp = (File)ongoingCreates.get(b);
-              if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
-                throw new IOException("Block " + b +
-                    " has already been started (though not completed), and thus cannot be created.");
-              } else {
-                // stale temp file - remove
-                if (!tmp.delete()) {
-                  throw new IOException("Can't write the block - unable to remove stale temp file " + tmp);
-                }
-                ongoingCreates.remove(b);
-              }
-            }
-            FSVolume v = null;
-            synchronized ( volumes ) {
-              v = volumes.getNextVolume(blockSize);
-              // create temporary file to hold block in the designated volume
-              f = v.createTmpFile(b);
-            }
-            ongoingCreates.put(b, f);
-            volumeMap.put(b, v);
-        }
-
-        //
-        // Finally, allow a writer to the block file
-        // REMIND - mjc - make this a filter stream that enforces a max
-        // block size, so clients can't go crazy
-        //
-        return new FileOutputStream(f);
+    if (isValidBlock(b)) {
+      throw new IOException("Block " + b + " is valid, and cannot be written to.");
     }
+    long blockSize = b.getNumBytes();
 
     //
-    // REMIND - mjc - eventually we should have a timeout system
-    // in place to clean up block files left by abandoned clients.
-    // We should have some timer in place, so that if a blockfile
-    // is created but non-valid, and has been idle for >48 hours,
-    // we can GC it safely.
+    // Serialize access to /tmp, and check if file already there.
     //
-
-    /**
-     * Complete the block write!
-     */
-    public synchronized void finalizeBlock(Block b) throws IOException {
-        File f = ongoingCreates.get(b);
-        if (f == null || ! f.exists()) {
-          throw new IOException("No temporary file " + f + " for block " + b);
-        }
-        long finalLen = f.length();
-        b.setNumBytes(finalLen);
-        FSVolume v = volumeMap.get(b);
-        
-        File dest = null;
-        synchronized ( volumes ) {
-          dest = v.addBlock(b, f);
-        }
-        blockMap.put(b, dest);
-        ongoingCreates.remove(b);
-    }
-
-    /**
-     * Return a table of block data
-     */
-    public Block[] getBlockReport() {
-        TreeSet<Block> blockSet = new TreeSet<Block>();
-        volumes.getBlockInfo(blockSet);
-        Block blockTable[] = new Block[blockSet.size()];
-        int i = 0;
-        for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
-            blockTable[i] = it.next();
-        }
-        return blockTable;
-    }
-
-    /**
-     * Check whether the given block is a valid one.
-     */
-    public boolean isValidBlock(Block b) {
-        File f = getFile(b);
-        return (f!= null && f.exists());
-    }
-
-    /**
-     * We're informed that a block is no longer valid.  We
-     * could lazily garbage-collect the block, but why bother?
-     * just get rid of it.
-     */
-    public void invalidate(Block invalidBlks[]) throws IOException {
-      for (int i = 0; i < invalidBlks.length; i++) {
-        File f;
-        synchronized (this) {
-          f = getFile(invalidBlks[i]);
-          if (f == null) {
-            throw new IOException("Unexpected error trying to delete block "
-                                  + invalidBlks[i] + 
-                                  ". Block not found in blockMap.");
-          }
-          FSVolume v = volumeMap.get(invalidBlks[i]);
-          if (v == null) {
-            throw new IOException("Unexpected error trying to delete block "
-                                  + invalidBlks[i] + 
-                                  ". No volume for this block.");
-          }
-          File parent = f.getParentFile();
-          if (parent == null) {
-            throw new IOException("Unexpected error trying to delete block "
-                                  + invalidBlks[i] + 
-                                  ". Parent not found for file " + f + ".");
-          }
-          v.clearPath(parent);
-          blockMap.remove(invalidBlks[i]);
-          volumeMap.remove(invalidBlks[i]);
-        }
-        if (!f.delete()) {
-            throw new IOException("Unexpected error trying to delete block "
-                                  + invalidBlks[i] + " at file " + f);
-        }
-        DataNode.LOG.info("Deleting block " + invalidBlks[i]);
+    File f = null;
+    synchronized ( this ) {
+      //
+      // Is it already in the create process?
+      //
+      if (ongoingCreates.containsKey(b)) {
+        // check how old is the temp file - wait 1 hour
+        File tmp = (File)ongoingCreates.get(b);
+        if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
+          throw new IOException("Block " + b +
+                                " has already been started (though not completed), and thus cannot be created.");
+        } else {
+          // stale temp file - remove
+          if (!tmp.delete()) {
+            throw new IOException("Can't write the block - unable to remove stale temp file " + tmp);
+          }
+          ongoingCreates.remove(b);
+        }
+      }
+      FSVolume v = null;
+      synchronized ( volumes ) {
+        v = volumes.getNextVolume(blockSize);
+        // create temporary file to hold block in the designated volume
+        f = v.createTmpFile(b);
       }
+      ongoingCreates.put(b, f);
+      volumeMap.put(b, v);
     }
 
-    /**
-     * Turn the block identifier into a filename.
-     */
-    synchronized File getFile(Block b) {
-      return blockMap.get(b);
-    }
+    //
+    // Finally, allow a writer to the block file
+    // REMIND - mjc - make this a filter stream that enforces a max
+    // block size, so clients can't go crazy
+    //
+    return new FileOutputStream(f);
+  }
 
-    /**
-     * check if a data diretory is healthy
-     * @throws DiskErrorException
-     * @author hairong
-     */
-    void checkDataDir() throws DiskErrorException {
-        volumes.checkDirs();
-    }
+  //
+  // REMIND - mjc - eventually we should have a timeout system
+  // in place to clean up block files left by abandoned clients.
+  // We should have some timer in place, so that if a blockfile
+  // is created but non-valid, and has been idle for >48 hours,
+  // we can GC it safely.
+  //
+
+  /**
+   * Complete the block write!
+   */
+  public synchronized void finalizeBlock(Block b) throws IOException {
+    File f = ongoingCreates.get(b);
+    if (f == null || ! f.exists()) {
+      throw new IOException("No temporary file " + f + " for block " + b);
+    }
+    long finalLen = f.length();
+    b.setNumBytes(finalLen);
+    FSVolume v = volumeMap.get(b);
+        
+    File dest = null;
+    synchronized ( volumes ) {
+      dest = v.addBlock(b, f);
+    }
+    blockMap.put(b, dest);
+    ongoingCreates.remove(b);
+  }
+
+  /**
+   * Return a table of block data
+   */
+  public Block[] getBlockReport() {
+    TreeSet<Block> blockSet = new TreeSet<Block>();
+    volumes.getBlockInfo(blockSet);
+    Block blockTable[] = new Block[blockSet.size()];
+    int i = 0;
+    for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
+      blockTable[i] = it.next();
+    }
+    return blockTable;
+  }
+
+  /**
+   * Check whether the given block is a valid one.
+   */
+  public boolean isValidBlock(Block b) {
+    File f = getFile(b);
+    return (f!= null && f.exists());
+  }
+
+  /**
+   * We're informed that a block is no longer valid.  We
+   * could lazily garbage-collect the block, but why bother?
+   * just get rid of it.
+   */
+  public void invalidate(Block invalidBlks[]) throws IOException {
+    for (int i = 0; i < invalidBlks.length; i++) {
+      File f;
+      synchronized (this) {
+        f = getFile(invalidBlks[i]);
+        if (f == null) {
+          throw new IOException("Unexpected error trying to delete block "
+                                + invalidBlks[i] + 
+                                ". Block not found in blockMap.");
+        }
+        FSVolume v = volumeMap.get(invalidBlks[i]);
+        if (v == null) {
+          throw new IOException("Unexpected error trying to delete block "
+                                + invalidBlks[i] + 
+                                ". No volume for this block.");
+        }
+        File parent = f.getParentFile();
+        if (parent == null) {
+          throw new IOException("Unexpected error trying to delete block "
+                                + invalidBlks[i] + 
+                                ". Parent not found for file " + f + ".");
+        }
+        v.clearPath(parent);
+        blockMap.remove(invalidBlks[i]);
+        volumeMap.remove(invalidBlks[i]);
+      }
+      if (!f.delete()) {
+        throw new IOException("Unexpected error trying to delete block "
+                              + invalidBlks[i] + " at file " + f);
+      }
+      DataNode.LOG.info("Deleting block " + invalidBlks[i]);
+    }
+  }
+
+  /**
+   * Turn the block identifier into a filename.
+   */
+  synchronized File getFile(Block b) {
+    return blockMap.get(b);
+  }
+
+  /**
+   * check if a data diretory is healthy
+   * @throws DiskErrorException
+   * @author hairong
+   */
+  void checkDataDir() throws DiskErrorException {
+    volumes.checkDirs();
+  }
     
 
-    public String toString() {
-      return "FSDataset{dirpath='"+volumes+"'}";
-    }
+  public String toString() {
+    return "FSDataset{dirpath='"+volumes+"'}";
+  }
 
 }