You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2007/12/24 23:20:53 UTC

svn commit: r606744 [2/3] - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSNamesystem.java src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=606744&r1=606743&r2=606744&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Dec 24 14:20:52 2007
@@ -1,3758 +1,3758 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.dfs;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.mapred.StatusHttpServer;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.Server;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.Map.Entry;
-import java.text.SimpleDateFormat;
-
-/***************************************************
- * FSNamesystem does the actual bookkeeping work for the
- * DataNode.
- *
- * It tracks several important tables.
- *
- * 1)  valid fsname --> blocklist  (kept on disk, logged)
- * 2)  Set of all valid blocks (inverted #1)
- * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports)
- * 4)  machine --> blocklist (inverted #2)
- * 5)  LRU cache of updated-heartbeat machines
- ***************************************************/
-class FSNamesystem implements FSConstants {
-  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
-
-  //
-  // Stores the correct file name hierarchy
-  //
-  FSDirectory dir;
-
-  //
-  // Stores the block-->datanode(s) map.  Updated only in response
-  // to client-sent information.
-  // Mapping: Block -> { INode, datanodes, self ref } 
-  //
-  BlocksMap blocksMap = new BlocksMap();
-    
-  /**
-   * Stores the datanode -> block map.  
-   * <p>
-   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
-   * storage id. In order to keep the storage map consistent it tracks 
-   * all storages ever registered with the namenode.
-   * A descriptor corresponding to a specific storage id can be
-   * <ul> 
-   * <li>added to the map if it is a new storage id;</li>
-   * <li>updated with a new datanode started as a replacement for the old one 
-   * with the same storage id; and </li>
-   * <li>removed if and only if an existing datanode is restarted to serve a
-   * different storage id.</li>
-   * </ul> <br>
-   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
-   * in the namespace image file. Only the {@link DatanodeInfo} part is 
-   * persistent, the list of blocks is restored from the datanode block
-   * reports. 
-   * <p>
-   * Mapping: StorageID -> DatanodeDescriptor
-   */
-  Map<String, DatanodeDescriptor> datanodeMap = 
-    new TreeMap<String, DatanodeDescriptor>();
-
-  //
-  // Keeps a Collection for every named machine containing
-  // blocks that have recently been invalidated and are thought to live
-  // on the machine in question.
-  // Mapping: StorageID -> ArrayList<Block>
-  //
-  private Map<String, Collection<Block>> recentInvalidateSets = 
-    new TreeMap<String, Collection<Block>>();
-
-  //
-  // Keeps a TreeSet for every named node.  Each treeset contains
-  // a list of the blocks that are "extra" at that location.  We'll
-  // eventually remove these extras.
-  // Mapping: StorageID -> TreeSet<Block>
-  //
-  private Map<String, Collection<Block>> excessReplicateMap = 
-    new TreeMap<String, Collection<Block>>();
-
-  //
-  // Stats on overall usage
-  //
-  long totalCapacity = 0L, totalUsed=0L, totalRemaining = 0L;
-
-  // total number of connections per live datanode
-  int totalLoad = 0;
-
-
-  //
-  // For the HTTP browsing interface
-  //
-  StatusHttpServer infoServer;
-  int infoPort;
-  Date startTime;
-    
-  //
-  Random r = new Random();
-
-  /**
-   * Stores a set of DatanodeDescriptor objects.
-   * This is a subset of {@link #datanodeMap}, containing nodes that are 
-   * considered alive.
-   * The {@link HeartbeatMonitor} periodically checks for outdated entries,
-   * and removes them from the list.
-   */
-  ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
-
-  //
-  // Store set of Blocks that need to be replicated 1 or more times.
-  // We also store pending replication-orders.
-  // Set of: Block
-  //
-  private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
-  private PendingReplicationBlocks pendingReplications;
-
-  //
-  // Used for handling lock-leases
-  // Mapping: leaseHolder -> Lease
-  //
-  private Map<StringBytesWritable, Lease> leases = new TreeMap<StringBytesWritable, Lease>();
-  // Set of: Lease
-  private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
-
-  //
-  // Threaded object that checks to see if we have been
-  // getting heartbeats from all clients. 
-  //
-  Daemon hbthread = null;   // HeartbeatMonitor thread
-  Daemon lmthread = null;   // LeaseMonitor thread
-  Daemon smmthread = null;  // SafeModeMonitor thread
-  Daemon replthread = null;  // Replication thread
-  volatile boolean fsRunning = true;
-  long systemStart = 0;
-
-  //  The maximum number of replicates we should allow for a single block
-  private int maxReplication;
-  //  How many outgoing replication streams a given node should have at one time
-  private int maxReplicationStreams;
-  // MIN_REPLICATION is how many copies we need in place or else we disallow the write
-  private int minReplication;
-  // Default replication
-  private int defaultReplication;
-  // heartbeatRecheckInterval is how often namenode checks for expired datanodes
-  private long heartbeatRecheckInterval;
-  // heartbeatExpireInterval is how long namenode waits for datanode to report
-  // heartbeat
-  private long heartbeatExpireInterval;
-  //replicationRecheckInterval is how often namenode checks for new replication work
-  private long replicationRecheckInterval;
-  //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
-  private long decommissionRecheckInterval;
-  // default block size of a file
-  private long defaultBlockSize = 0;
-  private int replIndex = 0; // last datanode used for replication work
-  static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
-
-  public static FSNamesystem fsNamesystemObject;
-  private String localMachine;
-  private int port;
-  private SafeModeInfo safeMode;  // safe mode information
-  private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
-    
-  // datanode networktoplogy
-  NetworkTopology clusterMap = new NetworkTopology();
-  // for block replicas placement
-  ReplicationTargetChooser replicator;
-
-  private HostsFileReader hostsReader; 
-  private Daemon dnthread = null;
-
-  // can fs-image be rolled?
-  volatile private CheckpointStates ckptState = CheckpointStates.START; 
-
-  private static final SimpleDateFormat DATE_FORM =
-    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-
-  /**
-   * FSNamesystem constructor.
-   */
-  FSNamesystem(NameNode nn, Configuration conf) throws IOException {
-    fsNamesystemObject = this;
-    try {
-      initialize(nn, conf);
-    } catch(IOException e) {
-      close();
-      throw e;
-    }
-  }
-
-  /**
-   * Initialize FSNamesystem.
-   */
-  private void initialize(NameNode nn, Configuration conf) throws IOException {
-    setConfigurationParameters(conf);
-
-    this.localMachine = nn.getNameNodeAddress().getHostName();
-    this.port = nn.getNameNodeAddress().getPort();
-    this.dir = new FSDirectory(this, conf);
-    StartupOption startOpt = NameNode.getStartupOption(conf);
-    this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
-    this.safeMode = new SafeModeInfo(conf);
-    setBlockTotal();
-    pendingReplications = new PendingReplicationBlocks(
-                            conf.getInt("dfs.replication.pending.timeout.sec", 
-                                        -1) * 1000L);
-    this.hbthread = new Daemon(new HeartbeatMonitor());
-    this.lmthread = new Daemon(new LeaseMonitor());
-    this.replthread = new Daemon(new ReplicationMonitor());
-    hbthread.start();
-    lmthread.start();
-    replthread.start();
-    this.systemStart = now();
-    this.startTime = new Date(systemStart); 
-
-    this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
-                                           conf.get("dfs.hosts.exclude",""));
-    this.dnthread = new Daemon(new DecommissionedMonitor());
-    dnthread.start();
-
-    String infoAddr = conf.get("dfs.http.bindAddress", "0.0.0.0:50070");
-    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
-    String infoHost = infoSocAddr.getHostName();
-    int tmpInfoPort = infoSocAddr.getPort();
-    this.infoServer = new StatusHttpServer("dfs", infoHost, tmpInfoPort, 
-                                            tmpInfoPort == 0);
-    this.infoServer.setAttribute("name.system", this);
-    this.infoServer.setAttribute("name.node", nn);
-    this.infoServer.setAttribute("name.conf", conf);
-    this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
-    this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
-    this.infoServer.addServlet("listPaths", "/listPaths/*", ListPathsServlet.class);
-    this.infoServer.addServlet("data", "/data/*", FileDataServlet.class);
-    this.infoServer.start();
-
-    // The web-server port can be ephemeral... ensure we have the correct info
-    this.infoPort = this.infoServer.getPort();
-    conf.set("dfs.http.bindAddress", infoHost + ":" + infoPort); 
-    LOG.info("Web-server up at: " + conf.get("dfs.http.bindAddress"));
-  }
-
-  static Collection<File> getNamespaceDirs(Configuration conf) {
-    String[] dirNames = conf.getStrings("dfs.name.dir");
-    if (dirNames == null)
-      dirNames = new String[] {"/tmp/hadoop/dfs/name"};
-    Collection<File> dirs = new ArrayList<File>(dirNames.length);
-    for(int idx = 0; idx < dirNames.length; idx++) {
-      dirs.add(new File(dirNames[idx]));
-    }
-    return dirs;
-  }
-
-  /**
-   * dirs is a list of directories where the filesystem directory state 
-   * is stored
-   */
-  FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
-    fsNamesystemObject = this;
-    setConfigurationParameters(conf);
-    this.dir = new FSDirectory(fsImage, this, conf);
-  }
-
-  /**
-   * Initializes some of the members from configuration
-   */
-  private void setConfigurationParameters(Configuration conf) 
-                                          throws IOException {
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
-                         this,
-                         clusterMap);
-    this.defaultReplication = conf.getInt("dfs.replication", 3);
-    this.maxReplication = conf.getInt("dfs.replication.max", 512);
-    this.minReplication = conf.getInt("dfs.replication.min", 1);
-    if (minReplication <= 0)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.min = " 
-                            + minReplication
-                            + " must be greater than 0");
-    if (maxReplication >= (int)Short.MAX_VALUE)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.max = " 
-                            + maxReplication + " must be less than " + (Short.MAX_VALUE));
-    if (maxReplication < minReplication)
-      throw new IOException(
-                            "Unexpected configuration parameters: dfs.replication.min = " 
-                            + minReplication
-                            + " must be less than dfs.replication.max = " 
-                            + maxReplication);
-    this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
-    long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
-    this.heartbeatRecheckInterval = conf.getInt(
-        "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
-    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
-      10 * heartbeatInterval;
-    this.replicationRecheckInterval = 3 * 1000; //  3 second
-    this.decommissionRecheckInterval = conf.getInt(
-                                                   "dfs.namenode.decommission.interval",
-                                                   5 * 60 * 1000);    
-    this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-  }
-
-  /** Return the FSNamesystem object
-   * 
-   */
-  public static FSNamesystem getFSNamesystem() {
-    return fsNamesystemObject;
-  } 
-
-  NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getNamespaceID(),
-                             dir.fsImage.getCTime(),
-                             getDistributedUpgradeVersion());
-  }
-
-  /** Close down this filesystem manager.
-   * Causes heartbeat and lease daemons to stop; waits briefly for
-   * them to finish, but a short timeout returns control back to caller.
-   */
-  public void close() {
-    fsRunning = false;
-    try {
-      if (pendingReplications != null) pendingReplications.stop();
-      if (infoServer != null) infoServer.stop();
-      if (hbthread != null) hbthread.interrupt();
-      if (replthread != null) replthread.interrupt();
-      if (dnthread != null) dnthread.interrupt();
-      if (smmthread != null) smmthread.interrupt();
-    } catch (InterruptedException ie) {
-    } finally {
-      // using finally to ensure we also wait for lease daemon
-      try {
-        if (lmthread != null) {
-          lmthread.interrupt();
-          lmthread.join(3000);
-        }
-      } catch (InterruptedException ie) {
-      } finally {
-        try {
-          dir.close();
-        } catch (IOException ex) {
-          // do nothing
-        }
-      }
-    }
-  }
-
-  /**
-   * Dump all metadata into specified file
-   */
-  void metaSave(String filename) throws IOException {
-    File file = new File(System.getProperty("hadoop.log.dir"), 
-                         filename);
-    PrintWriter out = new PrintWriter(new BufferedWriter(
-                                                         new FileWriter(file, true)));
- 
-
-    //
-    // Dump contents of neededReplication
-    //
-    synchronized (neededReplications) {
-      out.println("Metasave: Blocks waiting for replication: " + 
-                  neededReplications.size());
-      if (neededReplications.size() > 0) {
-        for (Iterator<Block> it = neededReplications.iterator(); 
-             it.hasNext();) {
-          Block block = it.next();
-          out.print(block);
-          for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-               jt.hasNext();) {
-            DatanodeDescriptor node = jt.next();
-            out.print(" " + node + " : ");
-          }
-          out.println("");
-        }
-      }
-    }
-
-    //
-    // Dump blocks from pendingReplication
-    //
-    pendingReplications.metaSave(out);
-
-    //
-    // Dump blocks that are waiting to be deleted
-    //
-    dumpRecentInvalidateSets(out);
-
-    //
-    // Dump all datanodes
-    //
-    datanodeDump(out);
-
-    out.flush();
-    out.close();
-  }
-
-  long getDefaultBlockSize() {
-    return defaultBlockSize;
-  }
-    
-  /* get replication factor of a block */
-  private int getReplication(Block block) {
-    INodeFile fileINode = blocksMap.getINode(block);
-    if (fileINode == null) { // block does not belong to any file
-      return 0;
-    }
-    assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
-    return fileINode.getReplication();
-  }
-
-  /* updates a block in under replication queue */
-  synchronized void updateNeededReplications(Block block,
-                        int curReplicasDelta, int expectedReplicasDelta) {
-    NumberReplicas repl = countNodes(block);
-    int curExpectedReplicas = getReplication(block);
-    neededReplications.update(block, 
-                              repl.liveReplicas(), 
-                              repl.decommissionedReplicas(),
-                              curExpectedReplicas,
-                              curReplicasDelta, expectedReplicasDelta);
-  }
-
-  /**
-   * Used only during DFS upgrade for block level CRCs (HADOOP-1134).
-   * This returns information for a given blocks that includes:
-   * <li> full path name for the file that contains the block.
-   * <li> offset of first byte of the block.
-   * <li> file length and length of the block.
-   * <li> all block locations for the crc file (".file.crc").
-   * <li> replication for crc file.
-   * When replicas is true, it includes replicas of the block.
-   */
-  public synchronized BlockCrcInfo blockCrcInfo(
-                           Block block,
-                           BlockCrcUpgradeObjectNamenode namenodeUpgradeObj,
-                           boolean replicas) {
-    BlockCrcInfo crcInfo = new BlockCrcInfo();
-    crcInfo.status = BlockCrcInfo.STATUS_ERROR;
-    
-    INodeFile fileINode = blocksMap.getINode(block);
-    if ( fileINode == null || fileINode.isDirectory() ) {
-      // Most probably reason is that this block does not exist
-      if (blocksMap.getStoredBlock(block) == null) {
-        crcInfo.status = BlockCrcInfo.STATUS_UNKNOWN_BLOCK;
-      } else {
-        LOG.warn("getBlockCrcInfo(): Could not find file for " + block);
-      }
-      return crcInfo;
-    }
-
-    crcInfo.fileName = "localName:" + fileINode.getLocalName();
-    
-    // Find the offset and length for this block.
-    Block[] fileBlocks = fileINode.getBlocks();
-    crcInfo.blockLen = -1;
-    if ( fileBlocks != null ) {
-      for ( Block b:fileBlocks ) {
-        if ( block.equals(b) ) {
-          crcInfo.blockLen = b.getNumBytes();
-        }
-        if ( crcInfo.blockLen < 0 ) {
-          crcInfo.startOffset += b.getNumBytes();
-        }
-        crcInfo.fileSize += b.getNumBytes();
-      }
-    }
-
-    if ( crcInfo.blockLen < 0 ) {
-      LOG.warn("blockCrcInfo(): " + block + 
-               " could not be found in blocks for " + crcInfo.fileName);
-      return crcInfo;
-    }
-    
-    String fileName = fileINode.getLocalName();    
-    if ( fileName.startsWith(".") && fileName.endsWith(".crc") ) {
-      crcInfo.status = BlockCrcInfo.STATUS_CRC_BLOCK;
-      return crcInfo;
-    }
-
-    if (replicas) {
-      // include block replica locations, instead of crcBlocks
-      crcInfo.blockLocationsIncluded = true;
-      
-      DatanodeInfo[] dnInfo = new DatanodeInfo[blocksMap.numNodes(block)];
-      Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-      for (int i=0; it != null && it.hasNext(); i++ ) {
-        dnInfo[i] = new DatanodeInfo(it.next());
-      }
-      crcInfo.blockLocations = new LocatedBlock(block, dnInfo, 
-                                                crcInfo.startOffset);
-    } else {
-
-      //Find CRC file
-      BlockCrcUpgradeObjectNamenode.INodeMapEntry entry =
-                                namenodeUpgradeObj.getINodeMapEntry(fileINode);
-      
-      if (entry == null || entry.parent == null) {
-        LOG.warn("Could not find parent INode for " + fileName + "  " + block);
-        return crcInfo;
-      }
-      
-      crcInfo.fileName = entry.getAbsoluteName();
-      
-      String crcName = "." + fileName + ".crc";
-      INode iNode = entry.getParentINode().getChild(crcName);
-      if (iNode == null || iNode.isDirectory()) {
-        // Should we log this?
-        crcInfo.status = BlockCrcInfo.STATUS_NO_CRC_DATA;
-        return crcInfo;
-      }
-
-      INodeFile crcINode = (INodeFile)iNode;
-      Block[] blocks = crcINode.getBlocks();
-      if ( blocks == null )  {
-        LOG.warn("getBlockCrcInfo(): could not find blocks for crc file for " +
-                 crcInfo.fileName);
-        return crcInfo;
-      }
-
-      crcInfo.crcBlocks = new LocatedBlock[ blocks.length ];
-      for (int i=0; i<blocks.length; i++) {
-        DatanodeInfo[] dnArr = new DatanodeInfo[ blocksMap.numNodes(blocks[i]) ];
-        int idx = 0;
-        for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[i]); 
-        it.hasNext();) { 
-          dnArr[ idx++ ] = it.next();
-        }
-        crcInfo.crcBlocks[i] = new LocatedBlock(blocks[i], dnArr);
-      }
-
-      crcInfo.crcReplication = crcINode.getReplication();
-    }
-    
-    crcInfo.status = BlockCrcInfo.STATUS_DATA_BLOCK;
-    return crcInfo;
-  }
-  
-  /////////////////////////////////////////////////////////
-  //
-  // These methods are called by secondary namenodes
-  //
-  /////////////////////////////////////////////////////////
-  /**
-   * return a list of blocks & their locations on <code>datanode</code> whose
-   * total size is <code>size</code>
-   * 
-   * @param datanode on which blocks are located
-   * @param size total size of blocks
-   */
-  synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
-      throws IOException {
-    DatanodeDescriptor node = getDatanode(datanode);
-    if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
-          + "Asking for blocks from an unrecorded node " + datanode.getName());
-      throw new IllegalArgumentException(
-          "Unexpected exception.  Got getBlocks message for datanode " + 
-          datanode.getName() + ", but there is no info for it");
-    }
-
-    int numBlocks = node.numBlocks();
-    if(numBlocks == 0) {
-      return new BlocksWithLocations(new BlockWithLocations[0]);
-    }
-    Iterator<Block> iter = node.getBlockIterator();
-    int startBlock = r.nextInt(numBlocks); // starting from a random block
-    // skip blocks
-    for(int i=0; i<startBlock; i++) {
-      iter.next();
-    }
-    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
-    long totalSize = 0;
-    while(totalSize<size && iter.hasNext()) {
-      totalSize += addBlock(iter.next(), results);
-    }
-    if(totalSize<size) {
-      iter = node.getBlockIterator(); // start from the beginning
-      for(int i=0; i<startBlock&&totalSize<size; i++) {
-        totalSize += addBlock(iter.next(), results);
-      }
-    }
-    
-    return new BlocksWithLocations(
-        results.toArray(new BlockWithLocations[results.size()]));
-  }
-  
-  /* Get all valid locations of the block & add the block to results
-   * return the length of the added block; 0 if the block is not added
-   */
-  private long addBlock(Block block, List<BlockWithLocations> results) {
-    ArrayList<String> machineSet =
-      new ArrayList<String>(blocksMap.numNodes(block));
-    for(Iterator<DatanodeDescriptor> it = 
-      blocksMap.nodeIterator(block); it.hasNext();) {
-      String storageID = it.next().getStorageID();
-      // filter invalidate replicas
-      Collection<Block> blocks = recentInvalidateSets.get(storageID); 
-      if(blocks==null || !blocks.contains(block)) {
-        machineSet.add(storageID);
-      }
-    }
-    if(machineSet.size() == 0) {
-      return 0;
-    } else {
-      results.add(new BlockWithLocations(block, 
-          machineSet.toArray(new String[machineSet.size()])));
-      return block.getNumBytes();
-    }
-  }
-
-  /////////////////////////////////////////////////////////
-  //
-  // These methods are called by HadoopFS clients
-  //
-  /////////////////////////////////////////////////////////
-  /**
-   * Get block locations within the specified range.
-   * 
-   * @see ClientProtocol#open(String, long, long)
-   * @see ClientProtocol#getBlockLocations(String, long, long)
-   */
-  LocatedBlocks getBlockLocations(String clientMachine,
-                                  String src, 
-                                  long offset, 
-                                  long length
-                                  ) throws IOException {
-    if (offset < 0) {
-      throw new IOException("Negative offset is not supported. File: " + src );
-    }
-    if (length < 0) {
-      throw new IOException("Negative length is not supported. File: " + src );
-    }
-
-    DatanodeDescriptor client = null;
-    LocatedBlocks blocks =  getBlockLocations(dir.getFileINode(src), 
-                                              offset, length, 
-                                              Integer.MAX_VALUE);
-    if (blocks == null) {
-      return null;
-    }
-    client = host2DataNodeMap.getDatanodeByHost(clientMachine);
-    for (Iterator<LocatedBlock> it = blocks.getLocatedBlocks().iterator();
-         it.hasNext();) {
-      LocatedBlock block = it.next();
-      clusterMap.pseudoSortByDistance(client, 
-                                (DatanodeDescriptor[])(block.getLocations()));
-    }
-    return blocks;
-  }
-  
-  private synchronized LocatedBlocks getBlockLocations(INodeFile inode, 
-                                                       long offset, 
-                                                       long length,
-                                                       int nrBlocksToReturn) {
-    if(inode == null) {
-      return null;
-    }
-    Block[] blocks = inode.getBlocks();
-    if (blocks == null) {
-      return null;
-    }
-    if (blocks.length == 0) {
-      return new LocatedBlocks(inode, new ArrayList<LocatedBlock>(blocks.length));
-    }
-    List<LocatedBlock> results;
-    results = new ArrayList<LocatedBlock>(blocks.length);
-
-    int curBlk = 0;
-    long curPos = 0, blkSize = 0;
-    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
-    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
-      blkSize = blocks[curBlk].getNumBytes();
-      assert blkSize > 0 : "Block of size 0";
-      if (curPos + blkSize > offset) {
-        break;
-      }
-      curPos += blkSize;
-    }
-    
-    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return null;
-    
-    long endOff = offset + length;
-    
-    do {
-      // get block locations
-      int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numNodes];
-      if (numNodes > 0) {
-        numNodes = 0;
-        for(Iterator<DatanodeDescriptor> it = 
-            blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          machineSet[numNodes++] = it.next();
-        }
-      }
-      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos));
-      curPos += blocks[curBlk].getNumBytes();
-      curBlk++;
-    } while (curPos < endOff 
-          && curBlk < blocks.length 
-          && results.size() < nrBlocksToReturn);
-    
-    return new LocatedBlocks(inode, results);
-  }
-
-  /**
-   * Set replication for an existing file.
-   * 
-   * The NameNode sets new replication and schedules either replication of 
-   * under-replicated data blocks or removal of the eccessive block copies 
-   * if the blocks are over-replicated.
-   * 
-   * @see ClientProtocol#setReplication(String, short)
-   * @param src file name
-   * @param replication new replication
-   * @return true if successful; 
-   *         false if file does not exist or is a directory
-   */
-  public boolean setReplication(String src, short replication) 
-                                throws IOException {
-    boolean status = setReplicationInternal(src, replication);
-    getEditLog().logSync();
-    return status;
-  }
-
-  private synchronized boolean setReplicationInternal(String src, 
-                                             short replication
-                                             ) throws IOException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set replication for " + src, safeMode);
-    verifyReplication(src, replication, null);
-
-    int[] oldReplication = new int[1];
-    Block[] fileBlocks;
-    fileBlocks = dir.setReplication(src, replication, oldReplication);
-    if (fileBlocks == null)  // file not found or is a directory
-      return false;
-    int oldRepl = oldReplication[0];
-    if (oldRepl == replication) // the same replication
-      return true;
-
-    // update needReplication priority queues
-    LOG.info("Increasing replication for file " + src 
-             + ". New replication is " + replication);
-    for(int idx = 0; idx < fileBlocks.length; idx++)
-      updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
-      
-    if (oldRepl > replication) {  
-      // old replication > the new one; need to remove copies
-      LOG.info("Reducing replication for file " + src 
-               + ". New replication is " + replication);
-      for(int idx = 0; idx < fileBlocks.length; idx++)
-        proccessOverReplicatedBlock(fileBlocks[idx], replication, null, null);
-    }
-    return true;
-  }
-    
-  public long getPreferredBlockSize(String filename) throws IOException {
-    return dir.getPreferredBlockSize(filename);
-  }
-    
-  /**
-   * Check whether the replication parameter is within the range
-   * determined by system configuration.
-   */
-  private void verifyReplication(String src, 
-                                 short replication, 
-                                 String clientName 
-                                 ) throws IOException {
-    String text = "file " + src 
-      + ((clientName != null) ? " on client " + clientName : "")
-      + ".\n"
-      + "Requested replication " + replication;
-
-    if (replication > maxReplication)
-      throw new IOException(text + " exceeds maximum " + maxReplication);
-      
-    if (replication < minReplication)
-      throw new IOException( 
-                            text + " is less than the required minimum " + minReplication);
-  }
-
-  void startFile(String src, String holder, String clientMachine, 
-                 boolean overwrite, short replication, long blockSize
-                ) throws IOException {
-    startFileInternal(src, holder, clientMachine, overwrite,
-                      replication, blockSize);
-    getEditLog().logSync();
-  }
-
-  /**
-   * The client would like to create a new block for the indicated
-   * filename.  Return an array that consists of the block, plus a set 
-   * of machines.  The first on this list should be where the client 
-   * writes data.  Subsequent items in the list must be provided in
-   * the connection to the first datanode.
-   * Return an array that consists of the block, plus a set
-   * of machines
-   * @throws IOException if the filename is invalid
-   *         {@link FSDirectory#isValidToCreate(String)}.
-   */
-  synchronized void startFileInternal(String src, 
-                                              String holder, 
-                                              String clientMachine, 
-                                              boolean overwrite,
-                                              short replication,
-                                              long blockSize
-                                             	) throws IOException {
-    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
-                                  +src+" for "+holder+" at "+clientMachine);
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create file" + src, safeMode);
-    if (!isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);      	  
-    }
-    try {
-      INode myFile = dir.getFileINode(src);
-      if (myFile != null && myFile.isUnderConstruction()) {
-        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
-        //
-        // If the file is under construction , then it must be in our
-        // leases. Find the appropriate lease record.
-        //
-        Lease lease = getLease(holder);
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease != null) {
-          throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because current leaseholder is trying to recreate file.");
-        }
-        //
-        // Find the original holder.
-        //
-        lease = getLease(pendingFile.getClientName());
-        if (lease == null) {
-          throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because pendingCreates is non-null but no leases found.");
-        }
-        //
-        // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then reclaim all resources and allow this request 
-        // to proceed. Otherwise, prevent this request from creating file.
-        //
-        if (lease.expiredSoftLimit()) {
-          synchronized (sortedLeases) {
-            lease.releaseLocks();
-            removeLease(lease.getHolder());
-            LOG.info("startFile: Removing lease " + lease + " ");
-            if (!sortedLeases.remove(lease)) {
-              LOG.error("startFile: Unknown failure trying to remove " + lease + 
-                        " from lease set.");
-            }
-          }
-        } else {
-          throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 ", because this file is already being created by " +
-                                                 pendingFile.getClientName() + 
-                                                 " on " + pendingFile.getClientMachine());
-        }
-      }
-
-      try {
-        verifyReplication(src, replication, clientMachine);
-      } catch(IOException e) {
-        throw new IOException("failed to create "+e.getMessage());
-      }
-      if (!dir.isValidToCreate(src)) {
-        if (overwrite) {
-          delete(src);
-        } else {
-          throw new IOException("failed to create file " + src 
-                                +" on client " + clientMachine
-                                +" either because the filename is invalid or the file exists");
-        }
-      }
-
-      DatanodeDescriptor clientNode = 
-        host2DataNodeMap.getDatanodeByHost(clientMachine);
-
-      synchronized (sortedLeases) {
-        Lease lease = getLease(holder);
-        if (lease == null) {
-          lease = new Lease(holder);
-          putLease(holder, lease);
-          sortedLeases.add(lease);
-        } else {
-          sortedLeases.remove(lease);
-          lease.renew();
-          sortedLeases.add(lease);
-        }
-        lease.startedCreate(src);
-      }
-
-      //
-      // Now we can add the name to the filesystem. This file has no
-      // blocks associated with it.
-      //
-      INode newNode = dir.addFile(src, replication, blockSize,
-                                  holder, 
-                                  clientMachine, 
-                                  clientNode);
-      if (newNode == null) {
-        throw new IOException("DIR* NameSystem.startFile: " +
-                              "Unable to add file to namespace.");
-      }
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
-                                   +ie.getMessage());
-      throw ie;
-    }
-
-    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
-                                  +"add "+src+" to namespace for "+holder);
-  }
-
-  /**
-   * The client would like to obtain an additional block for the indicated
-   * filename (which is being written-to).  Return an array that consists
-   * of the block, plus a set of machines.  The first on this list should
-   * be where the client writes data.  Subsequent items in the list must
-   * be provided in the connection to the first datanode.
-   *
-   * Make sure the previous blocks have been reported by datanodes and
-   * are replicated.  Will return an empty 2-elt array if we want the
-   * client to "try again later".
-   */
-  public LocatedBlock getAdditionalBlock(String src, 
-                                         String clientName
-                                         ) throws IOException {
-    long fileLength, blockSize;
-    int replication;
-    DatanodeDescriptor clientNode = null;
-    Block newBlock = null;
-
-    NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
-                                  +src+" for "+clientName);
-
-    synchronized (this) {
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot add block to " + src, safeMode);
-      }
-
-      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
-
-      //
-      // If we fail this, bad things happen!
-      //
-      if (!checkFileProgress(pendingFile, false)) {
-        throw new NotReplicatedYetException("Not replicated yet:" + src);
-      }
-      fileLength = pendingFile.computeContentsLength();
-      blockSize = pendingFile.getPreferredBlockSize();
-      clientNode = pendingFile.getClientNode();
-      replication = (int)pendingFile.getReplication();
-      newBlock = allocateBlock(src, pendingFile);
-    }
-
-    DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
-                                                           clientNode,
-                                                           null,
-                                                           blockSize);
-    if (targets.length < this.minReplication) {
-      // if we could not find any targets, remove this block from file
-      synchronized (this) {
-        INodeFile iFile = dir.getFileINode(src);
-        if (iFile != null && iFile.isUnderConstruction()) {
-          INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
-          if (pendingFile.getClientName().equals(clientName)) {
-            dir.removeBlock(src, pendingFile, newBlock);
-          }
-        }
-      }
-      throw new IOException("File " + src + " could only be replicated to " +
-                            targets.length + " nodes, instead of " +
-                            minReplication);
-    }
-        
-    // Create next block
-    return new LocatedBlock(newBlock, targets, fileLength);
-  }
-
-  /**
-   * The client would like to let go of the given block
-   */
-  public synchronized boolean abandonBlock(Block b, String src, String holder
-      ) throws IOException {
-    //
-    // Remove the block from the pending creates list
-    //
-    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                  +b.getBlockName()+"of file "+src);
-    INode file = checkLease(src, holder);
-    dir.removeBlock(src, file, b);
-    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    + b.getBlockName()
-                                    + " is removed from pendingCreates");
-    return true;
-  }
-  
-  // make sure that we still have the lease on this file
-  private INodeFileUnderConstruction checkLease(String src, String holder
-      ) throws IOException {
-    INode file = dir.getFileINode(src);
-    if (file == null || !file.isUnderConstruction()) {
-      throw new LeaseExpiredException("No lease on " + src);
-    }
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
-    if (!pendingFile.getClientName().equals(holder)) {
-      throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
-          + pendingFile.getClientName() + " but is accessed by " + holder);
-    }
-    return pendingFile;    
-  }
-
-  /**
-   * Abandon the entire file in progress
-   */
-  public synchronized void abandonFileInProgress(String src, 
-                                                 String holder
-                                                 ) throws IOException {
-    NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src);
-    synchronized (sortedLeases) {
-      // find the lease
-      Lease lease = getLease(holder);
-      if (lease != null) {
-        // remove the file from the lease
-        if (lease.completedCreate(src)) {
-          // if we found the file in the lease, remove it from pendingCreates
-          internalReleaseCreate(src, holder);
-        } else {
-          LOG.info("Attempt by " + holder + 
-                   " to release someone else's create lock on " + src);
-        }
-      } else {
-        LOG.info("Attempt to release a lock from an unknown lease holder "
-                 + holder + " for " + src);
-      }
-    }
-  }
-
-  /**
-   * The FSNamesystem will already know the blocks that make up the file.
-   * Before we return, we make sure that all the file's blocks have 
-   * been reported by datanodes and are replicated correctly.
-   */
-  public int completeFile(String src, String holder) throws IOException {
-    int status = completeFileInternal(src, holder);
-    getEditLog().logSync();
-    return status;
-  }
-
-  private synchronized int completeFileInternal(String src, 
-                                                String holder) throws IOException {
-    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot complete file " + src, safeMode);
-    INode iFile = dir.getFileINode(src);
-    INodeFileUnderConstruction pendingFile = null;
-    Block[] fileBlocks = null;
-
-    if (iFile != null && iFile.isUnderConstruction()) {
-      pendingFile = (INodeFileUnderConstruction) iFile;
-      fileBlocks =  dir.getFileBlocks(src);
-    }
-    if (fileBlocks == null ) {    
-      NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
-                                   + "failed to complete " + src
-                                   + " because dir.getFileBlocks() is null " + 
-                                   " and pendingFile is " + 
-                                   ((pendingFile == null) ? "null" : 
-                                     ("from " + pendingFile.getClientMachine()))
-                                  );                      
-      return OPERATION_FAILED;
-    } else if (!checkFileProgress(pendingFile, true)) {
-      return STILL_WAITING;
-    }
-        
-    // The file is no longer pending.
-    // Create permanent INode, update blockmap
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
-
-    // persist block allocations for this file
-    dir.persistBlocks(src, newFile);
-
-    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
-                                  + " blocklist persisted");
-
-    synchronized (sortedLeases) {
-      Lease lease = getLease(holder);
-      if (lease != null) {
-        lease.completedCreate(src);
-        if (!lease.hasLocks()) {
-          removeLease(holder);
-          sortedLeases.remove(lease);
-        }
-      }
-    }
-
-    //
-    // REMIND - mjc - this should be done only after we wait a few secs.
-    // The namenode isn't giving datanodes enough time to report the
-    // replicated blocks that are automatically done as part of a client
-    // write.
-    //
-
-    // Now that the file is real, we need to be sure to replicate
-    // the blocks.
-    int numExpectedReplicas = pendingFile.getReplication();
-    Block[] pendingBlocks = pendingFile.getBlocks();
-    int nrBlocks = pendingBlocks.length;
-    for (int i = 0; i < nrBlocks; i++) {
-      // filter out containingNodes that are marked for decommission.
-      NumberReplicas number = countNodes(pendingBlocks[i]);
-      if (number.liveReplicas() < numExpectedReplicas) {
-        neededReplications.add(pendingBlocks[i], 
-                               number.liveReplicas(), 
-                               number.decommissionedReplicas,
-                               numExpectedReplicas);
-      }
-    }
-    return COMPLETE_SUCCESS;
-  }
-
-  static Random randBlockId = new Random();
-    
-  /**
-   * Allocate a block at the given pending filename
-   */
-  private Block allocateBlock(String src, INode file) throws IOException {
-    Block b = null;
-    do {
-      b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
-    } while (isValidBlock(b));
-    b = dir.addBlock(src, file, b);
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
-                                 +src+ ". "+b.getBlockName());
-    return b;
-  }
-
-  /**
-   * Check that the indicated file's blocks are present and
-   * replicated.  If not, return false. If checkall is true, then check
-   * all blocks, otherwise check only penultimate block.
-   */
-  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
-    if (checkall) {
-      //
-      // check all blocks of the file.
-      //
-      for (Block block: v.getBlocks()) {
-        if (blocksMap.numNodes(block) < this.minReplication) {
-          return false;
-        }
-      }
-    } else {
-      //
-      // check the penultimate block of this file
-      //
-      Block b = v.getPenultimateBlock();
-      if (b != null) {
-        if (blocksMap.numNodes(b) < this.minReplication) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Adds block to list of blocks which will be invalidated on 
-   * specified datanode.
-   */
-  private void addToInvalidates(Block b, DatanodeInfo n) {
-    Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
-    if (invalidateSet == null) {
-      invalidateSet = new ArrayList<Block>();
-      recentInvalidateSets.put(n.getStorageID(), invalidateSet);
-    }
-    invalidateSet.add(b);
-  }
-
-  /**
-   * dumps the contents of recentInvalidateSets
-   */
-  private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
-    Collection<Collection<Block>> values = recentInvalidateSets.values();
-    Iterator<Map.Entry<String,Collection<Block>>> it = 
-      recentInvalidateSets.entrySet().iterator();
-    if (values.size() == 0) {
-      out.println("Metasave: Blocks waiting deletion: 0");
-      return;
-    }
-    out.println("Metasave: Blocks waiting deletion from " +
-                values.size() + " datanodes.");
-    while (it.hasNext()) {
-      Map.Entry<String,Collection<Block>> entry = it.next();
-      String storageId = entry.getKey();
-      DatanodeDescriptor node = datanodeMap.get(storageId);
-      Collection<Block> blklist = entry.getValue();
-      if (blklist.size() > 0) {
-        out.print(node.getName());
-        for (Iterator jt = blklist.iterator(); jt.hasNext();) {
-          Block block = (Block) jt.next();
-          out.print(" " + block); 
-        }
-        out.println("");
-      }
-    }
-  }
-
-  /**
-   * Invalidates the given block on the given datanode.
-   */
-  public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
-    throws IOException {
-    NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
-                                 + blk.getBlockName() + " on " 
-                                 + dn.getName());
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
-    }
-
-    // Check how many copies we have of the block.  If we have at least one
-    // copy on a live node, then we can delete it. 
-    int count = countNodes(blk).liveReplicas();
-    if (count > 1) {
-      addToInvalidates(blk, dn);
-      removeStoredBlock(blk, getDatanode(dn));
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
-                                   + blk.getBlockName() + " on " 
-                                   + dn.getName() + " listed for deletion.");
-    } else {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-                                   + blk.getBlockName() + " on " 
-                                   + dn.getName() + " is the only copy and was not deleted.");
-    }
-  }
-
-  ////////////////////////////////////////////////////////////////
-  // Here's how to handle block-copy failure during client write:
-  // -- As usual, the client's write should result in a streaming
-  // backup write to a k-machine sequence.
-  // -- If one of the backup machines fails, no worries.  Fail silently.
-  // -- Before client is allowed to close and finalize file, make sure
-  // that the blocks are backed up.  Namenode may have to issue specific backup
-  // commands to make up for earlier datanode failures.  Once all copies
-  // are made, edit namespace and return to client.
-  ////////////////////////////////////////////////////////////////
-
-  public boolean renameTo(String src, String dst) throws IOException {
-    boolean status = renameToInternal(src, dst);
-    getEditLog().logSync();
-    return status;
-  }
-
-  /**
-   * Change the indicated filename.
-   */
-  public synchronized boolean renameToInternal(String src, String dst) throws IOException {
-    NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot rename " + src, safeMode);
-    if (!isValidName(dst)) {
-      throw new IOException("Invalid name: " + dst);
-    }
-    return dir.renameTo(src, dst);
-  }
-
-  /**
-   * Remove the indicated filename from the namespace.  This may
-   * invalidate some blocks that make up the file.
-   */
-  public boolean delete(String src) throws IOException {
-    boolean status = deleteInternal(src, true);
-    getEditLog().logSync();
-    return status;
-  }
-
-  /**
-   * An internal delete function that does not enforce safe mode
-   */
-  boolean deleteInSafeMode(String src) throws IOException {
-    boolean status = deleteInternal(src, false);
-    getEditLog().logSync();
-    return status;
-  }
-  /**
-   * Remove the indicated filename from the namespace.  This may
-   * invalidate some blocks that make up the file.
-   */
-  private synchronized boolean deleteInternal(String src, 
-                                              boolean enforceSafeMode) 
-                                              throws IOException {
-    NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
-    if (enforceSafeMode && isInSafeMode())
-      throw new SafeModeException("Cannot delete " + src, safeMode);
-    Block deletedBlocks[] = dir.delete(src);
-    if (deletedBlocks != null) {
-      for (int i = 0; i < deletedBlocks.length; i++) {
-        Block b = deletedBlocks[i];
-                
-        for (Iterator<DatanodeDescriptor> it = 
-               blocksMap.nodeIterator(b); it.hasNext();) {
-          DatanodeDescriptor node = it.next();
-          addToInvalidates(b, node);
-          NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
-                                        + b.getBlockName() + " is added to invalidSet of " 
-                                        + node.getName());
-        }
-      }
-    }
-
-    return (deletedBlocks != null);
-  }
-
-  /**
-   * Return whether the given filename exists
-   */
-  public boolean exists(String src) {
-    if (dir.getFileBlocks(src) != null || dir.isDir(src)) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Whether the given name is a directory
-   */
-  public boolean isDir(String src) {
-    return dir.isDir(src);
-  }
-
-  /* Get the file info for a specific file.
-   * @param src The string representation of the path to the file
-   * @throws IOException if file does not exist
-   * @return object containing information regarding the file
-   */
-  DFSFileInfo getFileInfo(String src) throws IOException {
-    return dir.getFileInfo(src);
-  }
-
-  /**
-   * Whether the pathname is valid.  Currently prohibits relative paths, 
-   * and names which contain a ":" or "/" 
-   */
-  static boolean isValidName(String src) {
-      
-    // Path must be absolute.
-    if (!src.startsWith(Path.SEPARATOR)) {
-      return false;
-    }
-      
-    // Check for ".." "." ":" "/"
-    StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
-    while(tokens.hasMoreTokens()) {
-      String element = tokens.nextToken();
-      if (element.equals("..") || 
-          element.equals(".")  ||
-          (element.indexOf(":") >= 0)  ||
-          (element.indexOf("/") >= 0)) {
-        return false;
-      }
-    }
-    return true;
-  }
-  /**
-   * Create all the necessary directories
-   */
-  public boolean mkdirs(String src) throws IOException {
-    boolean status = mkdirsInternal(src);
-    getEditLog().logSync();
-    return status;
-  }
-    
-  /**
-   * Create all the necessary directories
-   */
-  private synchronized boolean mkdirsInternal(String src) throws IOException {
-    boolean    success;
-    NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create directory " + src, safeMode);
-    if (!isValidName(src)) {
-      throw new IOException("Invalid directory name: " + src);
-    }
-    success = dir.mkdirs(src, now());
-    if (!success) {
-      throw new IOException("Invalid directory name: " + src);
-    }
-    return success;
-  }
-
-  /* Get the size of the specified directory subtree.
-   * @param src The string representation of the path
-   * @throws IOException if path does not exist
-   * @return size in bytes
-   */
-  long getContentLength(String src) throws IOException {
-    return dir.getContentLength(src);
-  }
-
-  /************************************************************
-   * A Lease governs all the locks held by a single client.
-   * For each client there's a corresponding lease, whose
-   * timestamp is updated when the client periodically
-   * checks in.  If the client dies and allows its lease to
-   * expire, all the corresponding locks can be released.
-   *************************************************************/
-  class Lease implements Comparable<Lease> {
-    private StringBytesWritable holder;
-    private long lastUpdate;
-    private Collection<StringBytesWritable> locks = new TreeSet<StringBytesWritable>();
-    private Collection<StringBytesWritable> creates = new TreeSet<StringBytesWritable>();
-
-    public Lease(String holder) throws IOException {
-      this.holder = new StringBytesWritable(holder);
-      renew();
-    }
-    public void renew() {
-      this.lastUpdate = now();
-    }
-    /**
-     * Returns true if the Hard Limit Timer has expired
-     */
-    public boolean expiredHardLimit() {
-      if (now() - lastUpdate > LEASE_HARDLIMIT_PERIOD) {
-        return true;
-      }
-      return false;
-    }
-    /**
-     * Returns true if the Soft Limit Timer has expired
-     */
-    public boolean expiredSoftLimit() {
-      if (now() - lastUpdate > LEASE_SOFTLIMIT_PERIOD) {
-        return true;
-      }
-      return false;
-    }
-    public void obtained(String src) throws IOException {
-      locks.add(new StringBytesWritable(src));
-    }
-    public void released(String src) throws IOException {
-      locks.remove(new StringBytesWritable(src));
-    }
-    public void startedCreate(String src) throws IOException {
-      creates.add(new StringBytesWritable(src));
-    }
-    public boolean completedCreate(String src) throws IOException {
-      return creates.remove(new StringBytesWritable(src));
-    }
-    public boolean hasLocks() {
-      return (locks.size() + creates.size()) > 0;
-    }
-    public void releaseLocks() throws IOException {
-      String holderStr = holder.getString();
-      locks.clear();
-      for (Iterator<StringBytesWritable> it = creates.iterator(); it.hasNext();)
-        internalReleaseCreate(it.next().getString(), holderStr);
-      creates.clear();
-    }
-
-    /**
-     */
-    public String toString() {
-      return "[Lease.  Holder: " + holder.toString() + ", heldlocks: " +
-        locks.size() + ", pendingcreates: " + creates.size() + "]";
-    }
-
-    /**
-     */
-    public int compareTo(Lease o) {
-      Lease l1 = this;
-      Lease l2 = o;
-      long lu1 = l1.lastUpdate;
-      long lu2 = l2.lastUpdate;
-      if (lu1 < lu2) {
-        return -1;
-      } else if (lu1 > lu2) {
-        return 1;
-      } else {
-        return l1.holder.compareTo(l2.holder);
-      }
-    }
-
-    public boolean equals(Object o) {
-      if (!(o instanceof Lease)) {
-        return false;
-      }
-      Lease obj = (Lease) o;
-      if (lastUpdate == obj.lastUpdate &&
-          holder.equals(obj.holder)) {
-        return true;
-      }
-      return false;
-    }
-
-    public int hashCode() {
-      return holder.hashCode();
-    }
-    
-    String getHolder() throws IOException {
-      return holder.getString();
-    }
-  }
-  
-  /******************************************************
-   * LeaseMonitor checks for leases that have expired,
-   * and disposes of them.
-   ******************************************************/
-  class LeaseMonitor implements Runnable {
-    public void run() {
-      try {
-        while (fsRunning) {
-          synchronized (FSNamesystem.this) {
-            synchronized (sortedLeases) {
-              Lease top;
-              while ((sortedLeases.size() > 0) &&
-                     ((top = sortedLeases.first()) != null)) {
-                if (top.expiredHardLimit()) {
-                  top.releaseLocks();
-                  leases.remove(top.holder);
-                  LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());
-                  if (!sortedLeases.remove(top)) {
-                    LOG.info("Unknown failure trying to remove " + top + " from lease set.");
-                  }
-                } else {
-                  break;
-                }
-              }
-            }
-          }
-          try {
-            Thread.sleep(2000);
-          } catch (InterruptedException ie) {
-          }
-        }
-      } catch (Exception e) {
-        FSNamesystem.LOG.error(StringUtils.stringifyException(e));
-      }
-    }
-  }
-  
-  private Lease getLease(String holder) throws IOException {
-    return leases.get(new StringBytesWritable(holder));
-  }
-  
-  private void putLease(String holder, Lease lease) throws IOException {
-    leases.put(new StringBytesWritable(holder), lease);
-  }
-  
-  private void removeLease(String holder) throws IOException {
-    leases.remove(new StringBytesWritable(holder));
-  }
-
-  /**
-   * Move a file that is being written to be immutable.
-   * @param src The filename
-   * @param holder The datanode that was creating the file
-   */
-  private void internalReleaseCreate(String src, String holder) throws IOException {
-    INodeFile iFile = dir.getFileINode(src);
-    if (iFile == null) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
-                                   + "attempt to release a create lock on "
-                                   + src + " file does not exist.");
-      return;
-    }
-    if (!iFile.isUnderConstruction()) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
-                                   + "attempt to release a create lock on "
-                                   + src + " but file is already closed.");
-      return;
-    }
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
-
-    // The last block that was allocated migth not have been used by the
-    // client. In this case, the size of the last block would be 0. A fsck
-    // will report this block as a missing block because no datanodes have it.
-    // Delete this block.
-    Block[] blocks = pendingFile.getBlocks();
-    if (blocks != null && blocks.length > 1) {
-      Block last = blocks[blocks.length - 1];
-      if (last.getNumBytes() == 0) {
-          pendingFile.removeBlock(last);
-      }
-    }
-
-    // The file is no longer pending.
-    // Create permanent INode, update blockmap
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
-
-    // persist block allocations for this file
-    dir.persistBlocks(src, newFile);
-  
-    NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + 
-                                  src + " is no longer written to by " + 
-                                  holder);
-  }
-
-  /**
-   * Renew the lease(s) held by the given client
-   */
-  public void renewLease(String holder) throws IOException {
-    synchronized (sortedLeases) {
-      if (isInSafeMode())
-        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-      Lease lease = getLease(holder);
-      if (lease != null) {
-        sortedLeases.remove(lease);
-        lease.renew();
-        sortedLeases.add(lease);
-      }
-    }
-  }
-
-  /**
-   * Get a listing of all files at 'src'.  The Object[] array
-   * exists so we can return file attributes (soon to be implemented)
-   */
-  public DFSFileInfo[] getListing(String src) {
-    return dir.getListing(src);
-  }
-
-  /////////////////////////////////////////////////////////
-  //
-  // These methods are called by datanodes
-  //
-  /////////////////////////////////////////////////////////
-  /**
-   * Register Datanode.
-   * <p>
-   * The purpose of registration is to identify whether the new datanode
-   * serves a new data storage, and will report new data block copies,
-   * which the namenode was not aware of; or the datanode is a replacement
-   * node for the data storage that was previously served by a different
-   * or the same (in terms of host:port) datanode.
-   * The data storages are distinguished by their storageIDs. When a new
-   * data storage is reported the namenode issues a new unique storageID.
-   * <p>
-   * Finally, the namenode returns its namespaceID as the registrationID
-   * for the datanodes. 
-   * namespaceID is a persistent attribute of the name space.
-   * The registrationID is checked every time the datanode is communicating
-   * with the namenode. 
-   * Datanodes with inappropriate registrationID are rejected.
-   * If the namenode stops, and then restarts it can restore its 
-   * namespaceID and will continue serving the datanodes that has previously
-   * registered with the namenode without restarting the whole cluster.
-   * 
-   * @see DataNode#register()
-   */
-  public synchronized void registerDatanode(DatanodeRegistration nodeReg,
-                                            String networkLocation
-                                            ) throws IOException {
-
-    if (!verifyNodeRegistration(nodeReg)) {
-      throw new DisallowedDatanodeException(nodeReg);
-    }
-
-    String dnAddress = Server.getRemoteAddress();
-    if (dnAddress == null) {
-      //Mostly not called inside an RPC.
-      throw new IOException("Could not find remote address for " +
-                            "registration from " + nodeReg.getName());
-    }      
-
-    String hostName = nodeReg.getHost();
-      
-    // update the datanode's name with ip:port
-    DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
-                                      nodeReg.getStorageID(),
-                                      nodeReg.getInfoPort());
-    nodeReg.updateRegInfo(dnReg);
-      
-    NameNode.stateChangeLog.info(
-                                 "BLOCK* NameSystem.registerDatanode: "
-                                 + "node registration from " + nodeReg.getName()
-                                 + " storage " + nodeReg.getStorageID());
-
-    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
-      
-    if (nodeN != null && nodeN != nodeS) {
-      NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
-                        + "node from name: " + nodeN.getName());
-      // nodeN previously served a different data storage, 
-      // which is not served by anybody anymore.
-      removeDatanode(nodeN);
-      // physically remove node from datanodeMap
-      wipeDatanode(nodeN);
-      nodeN = null;
-    }
-
-    if (nodeS != null) {
-      if (nodeN == nodeS) {
-        // The same datanode has been just restarted to serve the same data 
-        // storage. We do not need to remove old data blocks, the delta will
-        // be calculated on the next block report from the datanode
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
-                                      + "node restarted.");
-      } else {
-        // nodeS is found
-        /* The registering datanode is a replacement node for the existing 
-          data storage, which from now on will be served by a new node.
-          If this message repeats, both nodes might have same storageID 
-          by (insanely rare) random chance. User needs to restart one of the
-          nodes with its data cleared (or user can just remove the StorageID
-          value in "VERSION" file under the data directory of the datanode,
-          but this is might not work if VERSION file format has changed 
-       */        
-        NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
-                                      + "node " + nodeS.getName()
-                                      + " is replaced by " + nodeReg.getName() + 
-                                      " with the same storageID " +
-                                      nodeReg.getStorageID());
-      }
-      // update cluster map
-      clusterMap.remove(nodeS);
-      nodeS.updateRegInfo(nodeReg);
-      nodeS.setNetworkLocation(networkLocation);
-      clusterMap.add(nodeS);
-      nodeS.setHostName(hostName);
-        
-      // also treat the registration message as a heartbeat
-      synchronized(heartbeats) {
-        if( !heartbeats.contains(nodeS)) {
-          heartbeats.add(nodeS);
-          //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0);
-          nodeS.isAlive = true;
-        }
-      }
-      return;
-    } 
-
-    // this is a new datanode serving a new data storage
-    if (nodeReg.getStorageID().equals("")) {
-      // this data storage has never been registered
-      // it is either empty or was created by pre-storageID version of DFS
-      nodeReg.storageID = newStorageID();
-      NameNode.stateChangeLog.debug(
-                                    "BLOCK* NameSystem.registerDatanode: "
-                                    + "new storageID " + nodeReg.getStorageID() + " assigned.");
-    }
-    // register new datanode
-    DatanodeDescriptor nodeDescr 
-      = new DatanodeDescriptor(nodeReg, networkLocation, hostName);
-    unprotectedAddDatanode(nodeDescr);
-    clusterMap.add(nodeDescr);
-      
-    // also treat the registration message as a heartbeat
-    synchronized(heartbeats) {
-      heartbeats.add(nodeDescr);
-      nodeDescr.isAlive = true;
-      // no need to update its timestamp
-      // because its is done when the descriptor is created
-    }
-    return;
-  }
-    
-  /**
-   * Get registrationID for datanodes based on the namespaceID.
-   * 
-   * @see #registerDatanode(DatanodeRegistration,String)
-   * @see FSImage#newNamespaceID()
-   * @return registration ID
-   */
-  public String getRegistrationID() {
-    return Storage.getRegistrationID(dir.fsImage);
-  }
-    
-  /**
-   * Generate new storage ID.
-   * 
-   * @return unique storage ID
-   * 
-   * Note: that collisions are still possible if somebody will try 
-   * to bring in a data storage from a different cluster.
-   */
-  private String newStorageID() {
-    String newID = null;
-    while(newID == null) {
-      newID = "DS" + Integer.toString(r.nextInt());
-      if (datanodeMap.get(newID) != null)
-        newID = null;
-    }
-    return newID;
-  }
-    
-  private boolean isDatanodeDead(DatanodeDescriptor node) {
-    return (node.getLastUpdate() <
-            (now() - heartbeatExpireInterval));
-  }
-    
-  void setDatanodeDead(DatanodeID nodeID) throws IOException {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    node.setLastUpdate(0);
-  }
-
-  /**
-   * The given node has reported in.  This method should:
-   * 1) Record the heartbeat, so the datanode isn't timed out
-   * 2) Adjust usage stats for future block allocation
-   * 
-   * If a substantial amount of time passed since the last datanode 
-   * heartbeat then request an immediate block report.  
-   * 
-   * @return true if registration is required or false otherwise.
-   * @throws IOException
-   */
-  public boolean gotHeartbeat(DatanodeID nodeID,
-                              long capacity,
-                              long dfsUsed,
-                              long remaining,
-                              int xceiverCount,
-                              int xmitsInProgress,
-                              Object[] xferResults,
-                              Object deleteList[]
-                              ) throws IOException {
-    synchronized (heartbeats) {
-      synchronized (datanodeMap) {
-        DatanodeDescriptor nodeinfo;
-        try {
-          nodeinfo = getDatanode(nodeID);
-          if (nodeinfo == null) {
-            return true;
-          }
-        } catch(UnregisteredDatanodeException e) {
-          return true;
-        }
-          
-        // Check if this datanode should actually be shutdown instead. 
-        if (shouldNodeShutdown(nodeinfo)) {
-          setDatanodeDead(nodeinfo);
-          throw new DisallowedDatanodeException(nodeinfo);
-        }
-
-        if (!nodeinfo.isAlive) {
-          return true;
-        } else {
-          updateStats(nodeinfo, false);
-          nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-          updateStats(nodeinfo, true);
-          //
-          // Extract pending replication work or block invalidation
-          // work from the datanode descriptor
-          //
-          nodeinfo.getReplicationSets(this.maxReplicationStreams - 
-                                      xmitsInProgress, xferResults); 
-          if (xferResults[0] == null) {
-            nodeinfo.getInvalidateBlocks(FSConstants.BLOCK_INVALIDATE_CHUNK,
-                                         deleteList);
-          }
-          return false;
-        }
-      }
-    }
-  }
-
-  private void updateStats(DatanodeDescriptor node, boolean isAdded) {
-    //
-    // The statistics are protected by the heartbeat lock
-    //
-    assert(Thread.holdsLock(heartbeats));
-    if (isAdded) {
-      totalCapacity += node.getCapacity();
-      totalUsed += node.getDfsUsed();
-      totalRemaining += node.getRemaining();
-      totalLoad += node.getXceiverCount();
-    } else {
-      totalCapacity -= node.getCapacity();
-      totalUsed -= node.getDfsUsed();
-      totalRemaining -= node.getRemaining();
-      totalLoad -= node.getXceiverCount();
-    }
-  }
-  /**
-   * Periodically calls heartbeatCheck().
-   */
-  class HeartbeatMonitor implements Runnable {
-    /**
-     */
-    public void run() {
-      while (fsRunning) {
-        try {
-          heartbeatCheck();
-        } catch (Exception e) {
-          FSNamesystem.LOG.error(StringUtils.stringifyException(e));
-        }
-        try {
-          Thread.sleep(heartbeatRecheckInterval);
-        } catch (InterruptedException ie) {
-        }
-      }
-    }
-  }
-
-  /**
-   * Periodically calls computeReplicationWork().
-   */
-  class ReplicationMonitor implements Runnable {
-    public void run() {
-      while (fsRunning) {
-        try {
-          computeDatanodeWork();
-          processPendingReplications();
-          Thread.sleep(replicationRecheckInterval);
-        } catch (InterruptedException ie) {
-        } catch (IOException ie) {
-          LOG.warn("ReplicationMonitor thread received exception. " + ie);
-        } catch (Throwable t) {
-          LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
-          Runtime.getRuntime().exit(-1);
-        }
-      }
-    }
-  }
-
-  /**
-   * Look at a few datanodes and compute any replication work that 
-   * can be scheduled on them. The datanode will be infomed of this
-   * work at the next heartbeat.
-   */
-  void computeDatanodeWork() throws IOException {
-    int numiter = 0;
-    int foundwork = 0;
-    int hsize = 0;
-    int lastReplIndex = -1;
-
-    while (true) {
-      DatanodeDescriptor node = null;
-
-      //
-      // pick the datanode that was the last one in the
-      // previous invocation of this method.
-      //
-      synchronized (heartbeats) {
-        hsize = heartbeats.size();
-        if (numiter++ >= hsize) {
-          // no change in replIndex.
-          if (lastReplIndex >= 0) {
-            //next time, start after where the last replication was scheduled
-            replIndex = lastReplIndex;
-          }
-          break;
-        }
-        if (replIndex >= hsize) {
-          replIndex = 0;
-        }
-        node = heartbeats.get(replIndex);
-        replIndex++;
-      }
-
-      //
-      // Is there replication work to be computed for this datanode?
-      //
-      int precomputed = node.getNumberOfBlocksToBeReplicated();
-      int needed = this.maxReplicationStreams - precomputed;
-      boolean doReplication = false;
-      boolean doInvalidation = false;
-      if (needed > 0) {
-        //
-        // Compute replication work and store work into the datanode
-        //
-        Object replsets[] = pendingTransfers(node, needed);
-        if (replsets != null) {
-          doReplication = true;
-          addBlocksToBeReplicated(node, (Block[])replsets[0], 
-                                  (DatanodeDescriptor[][])replsets[1]);
-          lastReplIndex = replIndex;
-        }
-      }
-      if (!doReplication) {
-        //
-        // Determine if block deletion is pending for this datanode
-        //
-        Block blocklist[] = blocksToInvalidate(node);
-        if (blocklist != null) {
-          doInvalidation = true;
-          addBlocksToBeInvalidated(node, blocklist);
-        }
-      }
-      if (doReplication || doInvalidation) {
-        //
-        // If we have already computed work for a predefined
-        // number of datanodes in this iteration, then relax
-        //
-        if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
-          break;
-        }
-        foundwork++;
-      } 
-    }
-  }
-
-  /**
-   * If there were any replication requests that timed out, reap them
-   * and put them back into the neededReplication queue
-   */
-  void processPendingReplications() {
-    Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
-    if (timedOutItems != null) {
-      synchronized (this) {
-        for (int i = 0; i < timedOutItems.length; i++) {
-          NumberReplicas num = countNodes(timedOutItems[i]);
-          neededReplications.add(timedOutItems[i], 
-                                 num.liveReplicas(),
-                                 num.decommissionedReplicas(),
-                                 getReplication(timedOutItems[i]));
-        }
-      }
-    }
-  }
-
-  /**
-   * Add more replication work for this datanode.
-   */
-  synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, 
-                                            Block[] blocklist,
-                                            DatanodeDescriptor[][] targets) 
-    throws IOException {
-    //
-    // Find the datanode with the FSNamesystem lock held.
-    //
-    DatanodeDescriptor n = getDatanode(node);
-    if (n != null) {
-      n.addBlocksToBeReplicated(blocklist, targets);
-    }
-  }
-
-  /**
-   * Add more block invalidation work for this datanode.
-   */
-  synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node, 
-                                             Block[] blocklist) throws IOException {
-    //
-    // Find the datanode with the FSNamesystem lock held.
-    //
-    DatanodeDescriptor n = getDatanode(node);
-    if (n != null) {
-      n.addBlocksToBeInvalidated(blocklist);
-    }
-  }
-
-  /**
-   * remove a datanode descriptor
-   * @param nodeID datanode ID
-   */
-  synchronized public void removeDatanode(DatanodeID nodeID) 
-    throws IOException {
-    DatanodeDescriptor nodeInfo = getDatanode(nodeID);
-    if (nodeInfo != null) {
-      removeDatanode(nodeInfo);
-    } else {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
-                                   + nodeID.getName() + " does not exist");
-    }
-  }
-  
-  /**
-   * remove a datanode descriptor
-   * @param nodeInfo datanode descriptor
-   */
-  private void removeDatanode(DatanodeDescriptor nodeInfo) {
-    synchronized (heartbeats) {
-      if (nodeInfo.isAlive) {
-        updateStats(nodeInfo, false);
-        heartbeats.remove(nodeInfo);
-        nodeInfo.isAlive = false;
-      }
-    }
-
-    for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
-      removeStoredBlock(it.next(), nodeInfo);
-    }
-    unprotectedRemoveDatanode(nodeInfo);
-    clusterMap.remove(nodeInfo);
-  }
-
-  void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
-    nodeDescr.resetBlocks();
-    NameNode.stateChangeLog.debug(
-                                  "BLOCK* NameSystem.unprotectedRemoveDatanode: "
-                                  + nodeDescr.getName() + " is out of service now.");
-  }
-    
-  void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
-    /* To keep host2DataNodeMap consistent with datanodeMap,
-       remove  from host2DataNodeMap the datanodeDescriptor removed
-       from datanodeMap before adding nodeDescr to host2DataNodeMap.
-    */
-    host2DataNodeMap.remove(
-                            datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
-    host2DataNodeMap.add(nodeDescr);
-      
-    NameNode.stateChangeLog.debug(
-                                  "BLOCK* NameSystem.unprotectedAddDatanode: "
-                                  + "node " + nodeDescr.getName() + " is added to datanodeMap.");
-  }
-
-  /**
-   * Physically remove node from datanodeMap.
-   * 
-   * @param nodeID node
-   */
-  void wipeDatanode(DatanodeID nodeID) throws IOException {
-    String key = nodeID.getStorageID();
-    host2DataNodeMap.remove(datanodeMap.remove(key));
-    NameNode.stateChangeLog.debug(
-                                  "BLOCK* NameSystem.wipeDatanode: "
-                                  + nodeID.getName() + " storage " + key 
-                                  + " is removed from datanodeMap.");
-  }
-
-  FSImage getFSImage() {
-    return dir.fsImage;
-  }
-
-  FSEditLog getEditLog() {
-    return getFSImage().getEditLog();
-  }
-
-  /**
-   * Check if there are any expired heartbeats, and if so,
-   * whether any blocks have to be re-replicated.
-   * While removing dead datanodes, make sure that only one datanode is marked
-   * dead at a time within the synchronized section. Otherwise, a cascading
-   * effect causes more datanodes to be declared dead.
-   */
-  void heartbeatCheck() {
-    boolean allAlive = false;
-    while (!allAlive) {
-      boolean foundDead = false;
-      DatanodeID nodeID = null;
-
-      // locate the first dead node.
-      synchronized(heartbeats) {
-        for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
-             it.hasNext();) {
-          DatanodeDescriptor nodeInfo = it.next();
-          if (isDatanodeDead(nodeInfo)) {
-            foundDead = true;
-            nodeID = nodeInfo;
-            break;
-          }
-        }
-      }
-
-      // acquire the fsnamesystem lock, and then remove the dead node.
-      if (foundDead) {
-        synchronized (this) {
-          synchronized(heartbeats) {
-            synchronized (datanodeMap) {
-              DatanodeDescriptor nodeInfo = null;
-              try {
-                nodeInfo = getDatanode(nodeID);
-              } catch (IOException e) {
-                nodeInfo = null;
-              }
-              if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
-                NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
-                                             + "lost heartbeat from " + nodeInfo.getName());
-                removeDatanode(nodeInfo);
-              }
-            }
-          }
-        }
-      }
-      allAlive = !foundDead;
-    }
-  }
-    
-  /**
-   * The given node is reporting all its blocks.  Use this info to 
-   * update the (machine-->blocklist) and (block-->machinelist) tables.
-   */
-  public synchronized Block[] processReport(DatanodeID nodeID, 
-                                            Block newReport[]
-                                            ) throws IOException {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                                    +"from "+nodeID.getName()+" "+newReport.length+" blocks");
-    }
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null) {
-      throw new IOException("ProcessReport from unregisterted node: "
-                            + nodeID.getName());
-    }
-
-    // Check if this datanode should actually be shutdown instead.
-    if (shouldNodeShutdown(node)) {
-      setDatanodeDead(node);
-      throw new DisallowedDatanodeException(node);
-    }
-
-    //
-    // Modify the (block-->datanode) map, according to the difference
-    // between the old and new block report.
-    //
-    Collection<Block> toAdd = new LinkedList<Block>();
-    Collection<Block> toRemove = new LinkedList<Block>();
-    node.reportDiff(blocksMap, newReport, toAdd, toRemove);
-        
-    for (Block b : toRemove) {
-      removeStoredBlock(b, node);
-    }
-    for (Block b : toAdd) {
-      addStoredBlock(b, node, null);
-    }
-        
-    //
-    // We've now completely updated the node's block report profile.
-    // We now go through all its blocks and find which ones are invalid,
-    // no longer pending, or over-replicated.
-    //
-    // (Note it's not enough to just invalidate blocks at lease expiry 
-    // time; datanodes can go down before the client's lease on 
-    // the failed file expires and miss the "expire" event.)
-    //
-    // This function considers every block on a datanode, and thus
-    // should only be invoked infrequently.
-    //
-    Collection<Block> obsolete = new ArrayList<Block>();
-    for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
-      Block b = it.next();
-
-      // 
-      // A block report can only send BLOCK_INVALIDATE_CHUNK number of
-      // blocks to be deleted. If there are more blocks to be deleted, 
-      // they are added to recentInvalidateSets and will be sent out
-      // thorugh succeeding heartbeat responses.
-      //
-      if (!isValidBlock(b)) {
-        if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) {
-          addToInvalidates(b, node);
-        } else {
-          obsolete.add(b);
-        }
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                                      +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
-      }
-    }
-    return obsolete.toArray(new Block[obsolete.size()]);
-  }
-
-  /**
-   * Modify (block-->datanode) map.  Remove block from set of 
-   * needed replications if this takes care of the problem.
-   * @return the block that is stored in blockMap.
-   */
-  synchronized Block addStoredBlock(Block block, 
-                                    DatanodeDescriptor node,
-                                    DatanodeDescriptor delNodeHint) {
-        
-    INodeFile fileINode = blocksMap.getINode(block);
-    int replication = (fileINode != null) ?  fileINode.getReplication() : 
-      defaultReplication;
-    boolean added = blocksMap.addNode(block, node, replication);
-        
-    Block storedBlock = blocksMap.getStoredBlock(block); //extra look up!
-    if (storedBlock != null && block != storedBlock) {
-      if (block.getNumBytes() > 0) {
-        long cursize = storedBlock.getNumBytes();
-        if (cursize == 0) {
-          storedBlock.setNumBytes(block.getNumBytes());
-        } else if (cursize != block.getNumBytes()) {
-          LOG.warn("Inconsistent size for block " + block + 
-                   " reported from " + node.getName() + 
-                   " current size is " + cursize +
-                   " reported size is " + block.getNumBytes());
-          // Accept this block even if there is a problem with its
-          // size. Clients should detect data corruption because of
-          // CRC mismatch.
-        }
-      }
-      block = storedBlock;
-    }
-        
-    int curReplicaDelta = 0;
-        
-    if (added) {
-      curReplicaDelta = 1;
-      // 
-      // At startup time, because too many new blocks come in
-      // they take up lots of space in the log file. 
-      // So, we log only when namenode is out of safemode.
-      //
-      if (!isInSafeMode()) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                      +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName());
-      }
-    } else {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
-                                   + "Redundant addStoredBlock request received for " 
-                                   + block.getBlockName() + " on " + node.getName());
-    }
-
-    //
-    // if file is being actively written to, then do not check 
-    // replication-factor here. It will be checked when the file is closed.
-    //
-    if (fileINode == null || fileINode.isUnderConstruction()) {
-      return block;
-    }
-        
-    // filter out containingNodes that are marked for decommission.
-    NumberReplicas num = countNodes(block);
-    int numCurrentReplica = num.liveReplicas()
-      + pendingReplications.getNumReplicas(block);
-        
-    // check whether safe replication is reached for the block
-    // only if it is a part of a files
-    incrementSafeBlockCount(numCurrentReplica);
- 
-    // handle underReplication/overReplication
-    short fileReplication = fileINode.getReplication();
-    if (numCurrentReplica >= fileReplication) {
-      neededReplications.remove(block, numCurrentReplica, 
-                                num.decommissionedReplicas, fileReplication);
-    } else {
-      updateNeededReplications(block, curReplicaDelta, 0);
-    }
-    if (numCurrentReplica > fileReplication) {
-      proccessOverReplicatedBlock(block, fileReplication, node, delNodeHint);
-    }
-    return block;
-  }
-    
-  /**
-   * Find how many of the containing nodes are "extra", if any.
-   * If there are any extras, call chooseExcessReplicates() to
-   * mark them in the excessReplicateMap.
-   */
-  private void proccessOverReplicatedBlock(Block block, short replication, 
-      DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
-    if(addedNode == delNodeHint) {
-      delNodeHint = null;
-    }
-    Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block); 
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
-      Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
-      if (excessBlocks == null || !excessBlocks.contains(block)) {
-        if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
-          nonExcess.add(cur);
-        }
-      }
-    }
-    chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint);    
-  }
-
-  /**
-   * We want "replication" replicates for the block, but we now have too many.  

[... 5119 lines stripped ...]