You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC

svn commit: r529410 [8/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Apr 16 14:44:35 2007
@@ -51,1932 +51,1932 @@
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
 class FSNamesystem implements FSConstants {
-    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
 
-    //
-    // Stores the correct file name hierarchy
-    //
-    FSDirectory dir;
-
-    //
-    // Stores the block-->datanode(s) map.  Updated only in response
-    // to client-sent information.
-    // Mapping: Block -> { INode, datanodes, self ref } 
-    //
-    BlocksMap blocksMap = new BlocksMap();
+  //
+  // Stores the correct file name hierarchy
+  //
+  FSDirectory dir;
+
+  //
+  // Stores the block-->datanode(s) map.  Updated only in response
+  // to client-sent information.
+  // Mapping: Block -> { INode, datanodes, self ref } 
+  //
+  BlocksMap blocksMap = new BlocksMap();
     
-    /**
-     * Stores the datanode -> block map.  
-     * <p>
-     * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
-     * storage id. In order to keep the storage map consistent it tracks 
-     * all storages ever registered with the namenode.
-     * A descriptor corresponding to a specific storage id can be
-     * <ul> 
-     * <li>added to the map if it is a new storage id;</li>
-     * <li>updated with a new datanode started as a replacement for the old one 
-     * with the same storage id; and </li>
-     * <li>removed if and only if an existing datanode is restarted to serve a
-     * different storage id.</li>
-     * </ul> <br>
-     * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
-     * in the namespace image file. Only the {@link DatanodeInfo} part is 
-     * persistent, the list of blocks is restored from the datanode block
-     * reports. 
-     * <p>
-     * Mapping: StorageID -> DatanodeDescriptor
-     */
-    Map<String, DatanodeDescriptor> datanodeMap = 
-                                      new TreeMap<String, DatanodeDescriptor>();
-
-    //
-    // Keeps a Collection for every named machine containing
-    // blocks that have recently been invalidated and are thought to live
-    // on the machine in question.
-    // Mapping: StorageID -> ArrayList<Block>
-    //
-    private Map<String, Collection<Block>> recentInvalidateSets = 
-                                      new TreeMap<String, Collection<Block>>();
-
-    //
-    // Keeps a TreeSet for every named node.  Each treeset contains
-    // a list of the blocks that are "extra" at that location.  We'll
-    // eventually remove these extras.
-    // Mapping: StorageID -> TreeSet<Block>
-    //
-    private Map<String, Collection<Block>> excessReplicateMap = 
-                                      new TreeMap<String, Collection<Block>>();
-
-    //
-    // Keeps track of files that are being created, plus the
-    // blocks that make them up.
-    // Mapping: fileName -> FileUnderConstruction
-    //
-    Map<UTF8, FileUnderConstruction> pendingCreates = 
-                                  new TreeMap<UTF8, FileUnderConstruction>();
-
-    //
-    // Keeps track of the blocks that are part of those pending creates
-    // Set of: Block
-    //
-    Collection<Block> pendingCreateBlocks = new TreeSet<Block>();
-
-    //
-    // Stats on overall usage
-    //
-    long totalCapacity = 0, totalRemaining = 0;
+  /**
+   * Stores the datanode -> block map.  
+   * <p>
+   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
+   * storage id. In order to keep the storage map consistent it tracks 
+   * all storages ever registered with the namenode.
+   * A descriptor corresponding to a specific storage id can be
+   * <ul> 
+   * <li>added to the map if it is a new storage id;</li>
+   * <li>updated with a new datanode started as a replacement for the old one 
+   * with the same storage id; and </li>
+   * <li>removed if and only if an existing datanode is restarted to serve a
+   * different storage id.</li>
+   * </ul> <br>
+   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+   * in the namespace image file. Only the {@link DatanodeInfo} part is 
+   * persistent, the list of blocks is restored from the datanode block
+   * reports. 
+   * <p>
+   * Mapping: StorageID -> DatanodeDescriptor
+   */
+  Map<String, DatanodeDescriptor> datanodeMap = 
+    new TreeMap<String, DatanodeDescriptor>();
 
-    // total number of connections per live datanode
-    int totalLoad = 0;
+  //
+  // Keeps a Collection for every named machine containing
+  // blocks that have recently been invalidated and are thought to live
+  // on the machine in question.
+  // Mapping: StorageID -> ArrayList<Block>
+  //
+  private Map<String, Collection<Block>> recentInvalidateSets = 
+    new TreeMap<String, Collection<Block>>();
+
+  //
+  // Keeps a TreeSet for every named node.  Each treeset contains
+  // a list of the blocks that are "extra" at that location.  We'll
+  // eventually remove these extras.
+  // Mapping: StorageID -> TreeSet<Block>
+  //
+  private Map<String, Collection<Block>> excessReplicateMap = 
+    new TreeMap<String, Collection<Block>>();
+
+  //
+  // Keeps track of files that are being created, plus the
+  // blocks that make them up.
+  // Mapping: fileName -> FileUnderConstruction
+  //
+  Map<UTF8, FileUnderConstruction> pendingCreates = 
+    new TreeMap<UTF8, FileUnderConstruction>();
+
+  //
+  // Keeps track of the blocks that are part of those pending creates
+  // Set of: Block
+  //
+  Collection<Block> pendingCreateBlocks = new TreeSet<Block>();
+
+  //
+  // Stats on overall usage
+  //
+  long totalCapacity = 0, totalRemaining = 0;
+
+  // total number of connections per live datanode
+  int totalLoad = 0;
+
+
+  //
+  // For the HTTP browsing interface
+  //
+  StatusHttpServer infoServer;
+  int infoPort;
+  String infoBindAddress;
+  Date startTime;
+    
+  //
+  Random r = new Random();
 
+  /**
+   * Stores a set of DatanodeDescriptor objects.
+   * This is a subset of {@link #datanodeMap}, containing nodes that are 
+   * considered alive.
+   * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+   * and removes them from the list.
+   */
+  ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
 
-    //
-    // For the HTTP browsing interface
-    //
-    StatusHttpServer infoServer;
-    int infoPort;
-    String infoBindAddress;
-    Date startTime;
-    
-    //
-    Random r = new Random();
+  //
+  // Store set of Blocks that need to be replicated 1 or more times.
+  // We also store pending replication-orders.
+  // Set of: Block
+  //
+  private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+  private PendingReplicationBlocks pendingReplications;
+
+  //
+  // Used for handling lock-leases
+  // Mapping: leaseHolder -> Lease
+  //
+  private Map<UTF8, Lease> leases = new TreeMap<UTF8, Lease>();
+  // Set of: Lease
+  private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
+
+  //
+  // Threaded object that checks to see if we have been
+  // getting heartbeats from all clients. 
+  //
+  Daemon hbthread = null;   // HeartbeatMonitor thread
+  Daemon lmthread = null;   // LeaseMonitor thread
+  Daemon smmthread = null;  // SafeModeMonitor thread
+  Daemon replthread = null;  // Replication thread
+  volatile boolean fsRunning = true;
+  long systemStart = 0;
+
+  //  The maximum number of replicates we should allow for a single block
+  private int maxReplication;
+  //  How many outgoing replication streams a given node should have at one time
+  private int maxReplicationStreams;
+  // MIN_REPLICATION is how many copies we need in place or else we disallow the write
+  private int minReplication;
+  // Default replication
+  private int defaultReplication;
+  // heartbeatRecheckInterval is how often namenode checks for expired datanodes
+  private long heartbeatRecheckInterval;
+  // heartbeatExpireInterval is how long namenode waits for datanode to report
+  // heartbeat
+  private long heartbeatExpireInterval;
+  //replicationRecheckInterval is how often namenode checks for new replication work
+  private long replicationRecheckInterval;
+  static int replIndex = 0; // last datanode used for replication work
+  static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
+
+  public static FSNamesystem fsNamesystemObject;
+  private String localMachine;
+  private int port;
+  private SafeModeInfo safeMode;  // safe mode information
+    
+  // datanode networktoplogy
+  NetworkTopology clusterMap = new NetworkTopology();
+  // for block replicas placement
+  ReplicationTargetChooser replicator;
 
-    /**
-     * Stores a set of DatanodeDescriptor objects.
-     * This is a subset of {@link #datanodeMap}, containing nodes that are 
-     * considered alive.
-     * The {@link HeartbeatMonitor} periodically checks for outdated entries,
-     * and removes them from the list.
-     */
-    ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
-
-    //
-    // Store set of Blocks that need to be replicated 1 or more times.
-    // We also store pending replication-orders.
-    // Set of: Block
-    //
-    private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
-    private PendingReplicationBlocks pendingReplications;
-
-    //
-    // Used for handling lock-leases
-    // Mapping: leaseHolder -> Lease
-    //
-    private Map<UTF8, Lease> leases = new TreeMap<UTF8, Lease>();
-    // Set of: Lease
-    private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
-
-    //
-    // Threaded object that checks to see if we have been
-    // getting heartbeats from all clients. 
-    //
-    Daemon hbthread = null;   // HeartbeatMonitor thread
-    Daemon lmthread = null;   // LeaseMonitor thread
-    Daemon smmthread = null;  // SafeModeMonitor thread
-    Daemon replthread = null;  // Replication thread
-    volatile boolean fsRunning = true;
-    long systemStart = 0;
-
-    //  The maximum number of replicates we should allow for a single block
-    private int maxReplication;
-    //  How many outgoing replication streams a given node should have at one time
-    private int maxReplicationStreams;
-    // MIN_REPLICATION is how many copies we need in place or else we disallow the write
-    private int minReplication;
-    // Default replication
-    private int defaultReplication;
-    // heartbeatRecheckInterval is how often namenode checks for expired datanodes
-    private long heartbeatRecheckInterval;
-    // heartbeatExpireInterval is how long namenode waits for datanode to report
-    // heartbeat
-    private long heartbeatExpireInterval;
-    //replicationRecheckInterval is how often namenode checks for new replication work
-    private long replicationRecheckInterval;
-    static int replIndex = 0; // last datanode used for replication work
-    static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
-
-    public static FSNamesystem fsNamesystemObject;
-    private String localMachine;
-    private int port;
-    private SafeModeInfo safeMode;  // safe mode information
-    
-    // datanode networktoplogy
-    NetworkTopology clusterMap = new NetworkTopology();
-    // for block replicas placement
-    ReplicationTargetChooser replicator;
+  private HostsFileReader hostsReader; 
+  private Daemon dnthread = null;
 
-    private HostsFileReader hostsReader; 
-    private Daemon dnthread = null;
+  /**
+   * dirs is a list oif directories where the filesystem directory state 
+   * is stored
+   */
+  public FSNamesystem(String hostname,
+                      int port,
+                      NameNode nn, Configuration conf) throws IOException {
+    fsNamesystemObject = this;
+    this.replicator = new ReplicationTargetChooser(
+                                                   conf.getBoolean("dfs.replication.considerLoad", true));
+    this.defaultReplication = conf.getInt("dfs.replication", 3);
+    this.maxReplication = conf.getInt("dfs.replication.max", 512);
+    this.minReplication = conf.getInt("dfs.replication.min", 1);
+    if( minReplication <= 0 )
+      throw new IOException(
+                            "Unexpected configuration parameters: dfs.replication.min = " 
+                            + minReplication
+                            + " must be greater than 0" );
+    if( maxReplication >= (int)Short.MAX_VALUE )
+      throw new IOException(
+                            "Unexpected configuration parameters: dfs.replication.max = " 
+                            + maxReplication + " must be less than " + (Short.MAX_VALUE) );
+    if( maxReplication < minReplication )
+      throw new IOException(
+                            "Unexpected configuration parameters: dfs.replication.min = " 
+                            + minReplication
+                            + " must be less than dfs.replication.max = " 
+                            + maxReplication );
+    this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+    long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
+    this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
+    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
+      10 * heartbeatInterval;
+    this.replicationRecheckInterval = 3 * 1000; //  3 second
+
+    this.localMachine = hostname;
+    this.port = port;
+    this.dir = new FSDirectory( this );
+    StartupOption startOpt = (StartupOption)conf.get( 
+                                                     "dfs.namenode.startup", StartupOption.REGULAR );
+    this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
+    this.safeMode = new SafeModeInfo( conf );
+    setBlockTotal();
+    pendingReplications = new PendingReplicationBlocks(LOG);
+    this.hbthread = new Daemon(new HeartbeatMonitor());
+    this.lmthread = new Daemon(new LeaseMonitor());
+    this.replthread = new Daemon(new ReplicationMonitor());
+    hbthread.start();
+    lmthread.start();
+    replthread.start();
+    this.systemStart = now();
+    this.startTime = new Date(systemStart); 
+        
+    this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
+                                           conf.get("dfs.hosts.exclude",""));
+    this.dnthread = new Daemon(new DecommissionedMonitor());
+    dnthread.start();
+
+    this.infoPort = conf.getInt("dfs.info.port", 50070);
+    this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
+    this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+    this.infoServer.setAttribute("name.system", this);
+    this.infoServer.setAttribute("name.node", nn);
+    this.infoServer.setAttribute("name.conf", conf);
+    this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
+    this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+    this.infoServer.start();
+        
+    // The web-server port can be ephemeral... ensure we have the correct info
+    this.infoPort = this.infoServer.getPort();
+    conf.set("dfs.info.port", this.infoPort); 
+    LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
+  }
 
-    /**
-     * dirs is a list oif directories where the filesystem directory state 
-     * is stored
-     */
-    public FSNamesystem(String hostname,
-                        int port,
-                        NameNode nn, Configuration conf) throws IOException {
-        fsNamesystemObject = this;
-        this.replicator = new ReplicationTargetChooser(
-                conf.getBoolean("dfs.replication.considerLoad", true));
-        this.defaultReplication = conf.getInt("dfs.replication", 3);
-        this.maxReplication = conf.getInt("dfs.replication.max", 512);
-        this.minReplication = conf.getInt("dfs.replication.min", 1);
-        if( minReplication <= 0 )
-          throw new IOException(
-              "Unexpected configuration parameters: dfs.replication.min = " 
-              + minReplication
-              + " must be greater than 0" );
-        if( maxReplication >= (int)Short.MAX_VALUE )
-          throw new IOException(
-              "Unexpected configuration parameters: dfs.replication.max = " 
-              + maxReplication + " must be less than " + (Short.MAX_VALUE) );
-        if( maxReplication < minReplication )
-          throw new IOException(
-              "Unexpected configuration parameters: dfs.replication.min = " 
-              + minReplication
-              + " must be less than dfs.replication.max = " 
-              + maxReplication );
-        this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
-        long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
-        this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
-        this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
-            10 * heartbeatInterval;
-        this.replicationRecheckInterval = 3 * 1000; //  3 second
-
-        this.localMachine = hostname;
-        this.port = port;
-        this.dir = new FSDirectory( this );
-        StartupOption startOpt = (StartupOption)conf.get( 
-                                "dfs.namenode.startup", StartupOption.REGULAR );
-        this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
-        this.safeMode = new SafeModeInfo( conf );
-        setBlockTotal();
-        pendingReplications = new PendingReplicationBlocks(LOG);
-        this.hbthread = new Daemon(new HeartbeatMonitor());
-        this.lmthread = new Daemon(new LeaseMonitor());
-        this.replthread = new Daemon(new ReplicationMonitor());
-        hbthread.start();
-        lmthread.start();
-        replthread.start();
-        this.systemStart = now();
-        this.startTime = new Date(systemStart); 
-        
-        this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
-                                               conf.get("dfs.hosts.exclude",""));
-        this.dnthread = new Daemon(new DecommissionedMonitor());
-        dnthread.start();
-
-        this.infoPort = conf.getInt("dfs.info.port", 50070);
-        this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
-        this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
-        this.infoServer.setAttribute("name.system", this);
-        this.infoServer.setAttribute("name.node", nn);
-        this.infoServer.setAttribute("name.conf", conf);
-        this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
-        this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
-        this.infoServer.start();
-        
-        // The web-server port can be ephemeral... ensure we have the correct info
-        this.infoPort = this.infoServer.getPort();
-        conf.set("dfs.info.port", this.infoPort); 
-        LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
-    }
-
-    static Collection<File> getNamespaceDirs(Configuration conf) {
-      String[] dirNames = conf.getStrings("dfs.name.dir");
-      if (dirNames == null)
-        dirNames = new String[] {"/tmp/hadoop/dfs/name"};
-      Collection<File> dirs = new ArrayList<File>( dirNames.length );
-      for( int idx = 0; idx < dirNames.length; idx++ ) {
-        dirs.add( new File(dirNames[idx] ));
-      }
-      return dirs;
+  static Collection<File> getNamespaceDirs(Configuration conf) {
+    String[] dirNames = conf.getStrings("dfs.name.dir");
+    if (dirNames == null)
+      dirNames = new String[] {"/tmp/hadoop/dfs/name"};
+    Collection<File> dirs = new ArrayList<File>( dirNames.length );
+    for( int idx = 0; idx < dirNames.length; idx++ ) {
+      dirs.add( new File(dirNames[idx] ));
     }
+    return dirs;
+  }
 
-    /**
-     * dirs is a list of directories where the filesystem directory state 
-     * is stored
-     */
-    FSNamesystem(FSImage fsImage) throws IOException {
-        fsNamesystemObject = this;
-        this.dir = new FSDirectory(fsImage, this);
-    }
+  /**
+   * dirs is a list of directories where the filesystem directory state 
+   * is stored
+   */
+  FSNamesystem(FSImage fsImage) throws IOException {
+    fsNamesystemObject = this;
+    this.dir = new FSDirectory(fsImage, this);
+  }
 
-    /** Return the FSNamesystem object
-     * 
-     */
-    public static FSNamesystem getFSNamesystem() {
-        return fsNamesystemObject;
-    } 
-    
-    NamespaceInfo getNamespaceInfo() {
-      return new NamespaceInfo( dir.fsImage.getNamespaceID(),
-                                dir.fsImage.getCTime() );
-    }
+  /** Return the FSNamesystem object
+   * 
+   */
+  public static FSNamesystem getFSNamesystem() {
+    return fsNamesystemObject;
+  } 
+    
+  NamespaceInfo getNamespaceInfo() {
+    return new NamespaceInfo( dir.fsImage.getNamespaceID(),
+                              dir.fsImage.getCTime() );
+  }
 
-    /** Close down this filesystem manager.
-     * Causes heartbeat and lease daemons to stop; waits briefly for
-     * them to finish, but a short timeout returns control back to caller.
-     */
-    public void close() {
-        fsRunning = false;
+  /** Close down this filesystem manager.
+   * Causes heartbeat and lease daemons to stop; waits briefly for
+   * them to finish, but a short timeout returns control back to caller.
+   */
+  public void close() {
+    fsRunning = false;
+    try {
+      if (pendingReplications != null) pendingReplications.stop();
+      if (infoServer != null) infoServer.stop();
+      if (hbthread != null) hbthread.interrupt();
+      if (replthread != null) replthread.interrupt();
+      if (dnthread != null) dnthread.interrupt();
+      if (smmthread != null) smmthread.interrupt();
+    } catch (InterruptedException ie) {
+    } finally {
+      // using finally to ensure we also wait for lease daemon
+      try {
+        if (lmthread != null) {
+          lmthread.interrupt();
+          lmthread.join(3000);
+        }
+      } catch (InterruptedException ie) {
+      } finally {
         try {
-          if (pendingReplications != null) pendingReplications.stop();
-          if (infoServer != null) infoServer.stop();
-          if (hbthread != null) hbthread.interrupt();
-          if (replthread != null) replthread.interrupt();
-          if (dnthread != null) dnthread.interrupt();
-          if (smmthread != null) smmthread.interrupt();
-        } catch (InterruptedException ie) {
-        } finally {
-          // using finally to ensure we also wait for lease daemon
-          try {
-            if (lmthread != null) {
-              lmthread.interrupt();
-              lmthread.join(3000);
-            }
-          } catch (InterruptedException ie) {
-          } finally {
-              try {
-                dir.close();
-              } catch (IOException ex) {
-                  // do nothing
-              }
-          }
+          dir.close();
+        } catch (IOException ex) {
+          // do nothing
         }
+      }
     }
+  }
 
-    /**
-     * Dump all metadata into specified file
-     */
-    void metaSave(String filename) throws IOException {
-      File file = new File(System.getProperty("hadoop.log.dir"), 
-                           filename);
-      PrintWriter out = new PrintWriter(new BufferedWriter(
-                                        new FileWriter(file, true)));
+  /**
+   * Dump all metadata into specified file
+   */
+  void metaSave(String filename) throws IOException {
+    File file = new File(System.getProperty("hadoop.log.dir"), 
+                         filename);
+    PrintWriter out = new PrintWriter(new BufferedWriter(
+                                                         new FileWriter(file, true)));
  
 
-      //
-      // Dump contents of neededReplication
-      //
-      synchronized (neededReplications) {
-        out.println("Metasave: Blocks waiting for replication: " + 
-                    neededReplications.size());
-        if (neededReplications.size() > 0) {
-          for (Iterator<Block> it = neededReplications.iterator(); 
-               it.hasNext();) {
-            Block block = it.next();
-            out.print(block);
-            for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-                 jt.hasNext(); ) {
-              DatanodeDescriptor node = jt.next();
-              out.print(" " + node + " : " );
-            }
-            out.println("");
+    //
+    // Dump contents of neededReplication
+    //
+    synchronized (neededReplications) {
+      out.println("Metasave: Blocks waiting for replication: " + 
+                  neededReplications.size());
+      if (neededReplications.size() > 0) {
+        for (Iterator<Block> it = neededReplications.iterator(); 
+             it.hasNext();) {
+          Block block = it.next();
+          out.print(block);
+          for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+               jt.hasNext(); ) {
+            DatanodeDescriptor node = jt.next();
+            out.print(" " + node + " : " );
           }
+          out.println("");
         }
       }
+    }
 
-      //
-      // Dump blocks from pendingReplication
-      //
-      pendingReplications.metaSave(out);
+    //
+    // Dump blocks from pendingReplication
+    //
+    pendingReplications.metaSave(out);
 
-      //
-      // Dump blocks that are waiting to be deleted
-      //
-      dumpRecentInvalidateSets(out);
+    //
+    // Dump blocks that are waiting to be deleted
+    //
+    dumpRecentInvalidateSets(out);
 
-      //
-      // Dump all datanodes
-      //
-      datanodeDump(out);
+    //
+    // Dump all datanodes
+    //
+    datanodeDump(out);
 
-      out.flush();
-      out.close();
-    }
+    out.flush();
+    out.close();
+  }
     
-    /* get replication factor of a block */
-    private int getReplication( Block block ) {
-        FSDirectory.INode fileINode = blocksMap.getINode( block );
-        if( fileINode == null ) { // block does not belong to any file
-            return 0;
-        } else {
-            return fileINode.getReplication();
-        }
+  /* get replication factor of a block */
+  private int getReplication( Block block ) {
+    FSDirectory.INode fileINode = blocksMap.getINode( block );
+    if( fileINode == null ) { // block does not belong to any file
+      return 0;
+    } else {
+      return fileINode.getReplication();
     }
+  }
 
-    /* Class for keeping track of under replication blocks
-     * Blocks have replication priority, with priority 0 indicating the highest
-     * Blocks have only one replicas has the highest
-     */
-    private class UnderReplicatedBlocks {
-        private static final int LEVEL = 3;
-        TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
+  /* Class for keeping track of under replication blocks
+   * Blocks have replication priority, with priority 0 indicating the highest
+   * Blocks have only one replicas has the highest
+   */
+  private class UnderReplicatedBlocks {
+    private static final int LEVEL = 3;
+    TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
         
-        /* constructor */
-        UnderReplicatedBlocks() {
-            for(int i=0; i<LEVEL; i++) {
-                priorityQueues[i] = new TreeSet<Block>();
-            }
-        }
+    /* constructor */
+    UnderReplicatedBlocks() {
+      for(int i=0; i<LEVEL; i++) {
+        priorityQueues[i] = new TreeSet<Block>();
+      }
+    }
         
-        /* Return the total number of under replication blocks */
-        synchronized int size() {
-            int size = 0;
-            for( int i=0; i<LEVEL; i++ ) {
-                size += priorityQueues[i].size();
-            }
-            return size;
-        }
+    /* Return the total number of under replication blocks */
+    synchronized int size() {
+      int size = 0;
+      for( int i=0; i<LEVEL; i++ ) {
+        size += priorityQueues[i].size();
+      }
+      return size;
+    }
         
-        /* Check if a block is in the neededReplication queue */
-        synchronized boolean contains(Block block) {
-            for(TreeSet<Block> set:priorityQueues) {
-                if(set.contains(block)) return true;
-            }
-            return false;
-        }
+    /* Check if a block is in the neededReplication queue */
+    synchronized boolean contains(Block block) {
+      for(TreeSet<Block> set:priorityQueues) {
+        if(set.contains(block)) return true;
+      }
+      return false;
+    }
         
-        /* Return the priority of a block
-        * @param block a under replication block
-        * @param curReplicas current number of replicas of the block
-        * @param expectedReplicas expected number of replicas of the block
-        */
-        private int getPriority(Block block, 
-                int curReplicas, int expectedReplicas) {
-            if (curReplicas<=0 || curReplicas>=expectedReplicas) {
-                return LEVEL; // no need to replicate
-            } else if(curReplicas==1) {
-                return 0; // highest priority
-            } else if(curReplicas*3<expectedReplicas) {
-                return 1;
-            } else {
-                return 2;
-            }
-        }
+    /* Return the priority of a block
+     * @param block a under replication block
+     * @param curReplicas current number of replicas of the block
+     * @param expectedReplicas expected number of replicas of the block
+     */
+    private int getPriority(Block block, 
+                            int curReplicas, int expectedReplicas) {
+      if (curReplicas<=0 || curReplicas>=expectedReplicas) {
+        return LEVEL; // no need to replicate
+      } else if(curReplicas==1) {
+        return 0; // highest priority
+      } else if(curReplicas*3<expectedReplicas) {
+        return 1;
+      } else {
+        return 2;
+      }
+    }
         
-        /* add a block to a under replication queue according to its priority
-         * @param block a under replication block
-         * @param curReplicas current number of replicas of the block
-         * @param expectedReplicas expected number of replicas of the block
-         */
-        synchronized boolean add(
-            Block block, int curReplicas, int expectedReplicas) {
-            if(curReplicas<=0 || expectedReplicas <= curReplicas) {
-                return false;
-            }
-            int priLevel = getPriority(block, curReplicas, expectedReplicas);
-            if( priorityQueues[priLevel].add(block) ) {
-                NameNode.stateChangeLog.debug(
-                        "BLOCK* NameSystem.UnderReplicationBlock.add:"
-                      + block.getBlockName()
-                      + " has only "+curReplicas
-                      + " replicas and need " + expectedReplicas
-                      + " replicas so is added to neededReplications"
-                      + " at priority level " + priLevel );
-                return true;
-            }
-            return false;
-        }
+    /* add a block to a under replication queue according to its priority
+     * @param block a under replication block
+     * @param curReplicas current number of replicas of the block
+     * @param expectedReplicas expected number of replicas of the block
+     */
+    synchronized boolean add(
+                             Block block, int curReplicas, int expectedReplicas) {
+      if(curReplicas<=0 || expectedReplicas <= curReplicas) {
+        return false;
+      }
+      int priLevel = getPriority(block, curReplicas, expectedReplicas);
+      if( priorityQueues[priLevel].add(block) ) {
+        NameNode.stateChangeLog.debug(
+                                      "BLOCK* NameSystem.UnderReplicationBlock.add:"
+                                      + block.getBlockName()
+                                      + " has only "+curReplicas
+                                      + " replicas and need " + expectedReplicas
+                                      + " replicas so is added to neededReplications"
+                                      + " at priority level " + priLevel );
+        return true;
+      }
+      return false;
+    }
 
-        /* add a block to a under replication queue */
-        synchronized boolean add(Block block) {
-            int expectedReplicas = getReplication(block);
-            return add(block,
-                       countContainingNodes( block ),
-                       expectedReplicas);
-        }
-        
-        /* remove a block from a under replication queue */
-        synchronized boolean remove(Block block, 
-                int oldReplicas, int oldExpectedReplicas) {
-            int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
-            return remove(block, priLevel);
-        }
-        
-        /* remove a block from a under replication queue given a priority*/
-        private boolean remove(Block block, int priLevel ) {
-            if( priLevel >= 0 && priLevel < LEVEL 
-                    && priorityQueues[priLevel].remove(block) ) {
-                NameNode.stateChangeLog.debug(
-                     "BLOCK* NameSystem.UnderReplicationBlock.remove: "
-                   + "Removing block " + block.getBlockName()
-                   + " from priority queue "+ priLevel );
-                return true;
-            } else {
-                for(int i=0; i<LEVEL; i++) {
-                    if( i!=priLevel && priorityQueues[i].remove(block) ) {
-                        NameNode.stateChangeLog.debug(
-                             "BLOCK* NameSystem.UnderReplicationBlock.remove: "
-                           + "Removing block " + block.getBlockName()
-                           + " from priority queue "+ i );
-                        return true;
-                    }
-                }
-            }
-            return false;
+    /* add a block to a under replication queue */
+    synchronized boolean add(Block block) {
+      int expectedReplicas = getReplication(block);
+      return add(block,
+                 countContainingNodes( block ),
+                 expectedReplicas);
+    }
+        
+    /* remove a block from a under replication queue */
+    synchronized boolean remove(Block block, 
+                                int oldReplicas, int oldExpectedReplicas) {
+      int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+      return remove(block, priLevel);
+    }
+        
+    /* remove a block from a under replication queue given a priority*/
+    private boolean remove(Block block, int priLevel ) {
+      if( priLevel >= 0 && priLevel < LEVEL 
+          && priorityQueues[priLevel].remove(block) ) {
+        NameNode.stateChangeLog.debug(
+                                      "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+                                      + "Removing block " + block.getBlockName()
+                                      + " from priority queue "+ priLevel );
+        return true;
+      } else {
+        for(int i=0; i<LEVEL; i++) {
+          if( i!=priLevel && priorityQueues[i].remove(block) ) {
+            NameNode.stateChangeLog.debug(
+                                          "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+                                          + "Removing block " + block.getBlockName()
+                                          + " from priority queue "+ i );
+            return true;
+          }
         }
+      }
+      return false;
+    }
         
-        /* remove a block from a under replication queue */
-        synchronized boolean remove(Block block) {
-            int curReplicas = countContainingNodes( block );
-            int expectedReplicas = getReplication(block);
-            return remove(block, curReplicas, expectedReplicas);
-        }
-        
-        /* update the priority level of a block */
-        synchronized void update(Block block,
-                int curReplicasDelta, int expectedReplicasDelta) {
-            int curReplicas = countContainingNodes( block );
-            int curExpectedReplicas = getReplication(block);
-            int oldReplicas = curReplicas-curReplicasDelta;
-            int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-            int curPri = getPriority(block, curReplicas, curExpectedReplicas);
-            int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
-            NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
-                               block +
-                               " curReplicas " + curReplicas +
-                               " curExpectedReplicas " + curExpectedReplicas +
-                               " oldReplicas " + oldReplicas +
-                               " oldExpectedReplicas  " + oldExpectedReplicas +
-                               " curPri  " + curPri +
-                               " oldPri  " + oldPri);
-            if( oldPri != LEVEL && oldPri != curPri ) {
-                remove(block, oldPri);
-            }
-            if( curPri != LEVEL && oldPri != curPri 
-                    && priorityQueues[curPri].add(block)) {
-                NameNode.stateChangeLog.debug(
-                        "BLOCK* NameSystem.UnderReplicationBlock.update:"
-                      + block.getBlockName()
-                      + " has only "+curReplicas
-                      + " replicas and need " + curExpectedReplicas
-                      + " replicas so is added to neededReplications"
-                      + " at priority level " + curPri );
-            }
-        }
+    /* remove a block from a under replication queue */
+    synchronized boolean remove(Block block) {
+      int curReplicas = countContainingNodes( block );
+      int expectedReplicas = getReplication(block);
+      return remove(block, curReplicas, expectedReplicas);
+    }
+        
+    /* update the priority level of a block */
+    synchronized void update(Block block,
+                             int curReplicasDelta, int expectedReplicasDelta) {
+      int curReplicas = countContainingNodes( block );
+      int curExpectedReplicas = getReplication(block);
+      int oldReplicas = curReplicas-curReplicasDelta;
+      int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+      int curPri = getPriority(block, curReplicas, curExpectedReplicas);
+      int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+      NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
+                                    block +
+                                    " curReplicas " + curReplicas +
+                                    " curExpectedReplicas " + curExpectedReplicas +
+                                    " oldReplicas " + oldReplicas +
+                                    " oldExpectedReplicas  " + oldExpectedReplicas +
+                                    " curPri  " + curPri +
+                                    " oldPri  " + oldPri);
+      if( oldPri != LEVEL && oldPri != curPri ) {
+        remove(block, oldPri);
+      }
+      if( curPri != LEVEL && oldPri != curPri 
+          && priorityQueues[curPri].add(block)) {
+        NameNode.stateChangeLog.debug(
+                                      "BLOCK* NameSystem.UnderReplicationBlock.update:"
+                                      + block.getBlockName()
+                                      + " has only "+curReplicas
+                                      + " replicas and need " + curExpectedReplicas
+                                      + " replicas so is added to neededReplications"
+                                      + " at priority level " + curPri );
+      }
+    }
         
-        /* return a iterator of all the under replication blocks */
-        synchronized Iterator<Block> iterator() {
-            return new Iterator<Block>() {
-                int level;
-                Iterator<Block>[] iterator = new Iterator[LEVEL];
+    /* return a iterator of all the under replication blocks */
+    synchronized Iterator<Block> iterator() {
+      return new Iterator<Block>() {
+        int level;
+        Iterator<Block>[] iterator = new Iterator[LEVEL];
                 
-                {
-                    level=0;
-                    for(int i=0; i<LEVEL; i++) {
-                        iterator[i] = priorityQueues[i].iterator();
-                    }
-                }
+        {
+          level=0;
+          for(int i=0; i<LEVEL; i++) {
+            iterator[i] = priorityQueues[i].iterator();
+          }
+        }
                 
-                private void update() {
-                    while( level< LEVEL-1 && !iterator[level].hasNext()  ) {
-                        level++;
-                    }
-                }
+        private void update() {
+          while( level< LEVEL-1 && !iterator[level].hasNext()  ) {
+            level++;
+          }
+        }
                 
-                public Block next() {
-                    update();
-                    return iterator[level].next();
-                }
+        public Block next() {
+          update();
+          return iterator[level].next();
+        }
                 
-                public boolean hasNext() {
-                    update();
-                    return iterator[level].hasNext();
-                }
+        public boolean hasNext() {
+          update();
+          return iterator[level].hasNext();
+        }
                 
-                public void remove() {
-                    iterator[level].remove();
-                }
-            };
+        public void remove() {
+          iterator[level].remove();
         }
+      };
     }
+  }
     
-    /////////////////////////////////////////////////////////
-    //
-    // These methods are called by HadoopFS clients
-    //
-    /////////////////////////////////////////////////////////
-    /**
-     * The client wants to open the given filename.  Return a
-     * list of (block,machineArray) pairs.  The sequence of unique blocks
-     * in the list indicates all the blocks that make up the filename.
-     *
-     * The client should choose one of the machines from the machineArray
-     * at random.
-     */
-    public Object[] open(String clientMachine, UTF8 src) {
-        Object results[] = null;
-        Block blocks[] = dir.getFile(src);
-        if (blocks != null) {
-            results = new Object[2];
-            DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
-            DatanodeDescriptor client = 
-              host2DataNodeMap.getDatanodeByHost(clientMachine);
-
-            for (int i = 0; i < blocks.length; i++) {
-                int numNodes = blocksMap.numNodes( blocks[i] );
-                if ( numNodes <= 0 ) {
-                    machineSets[i] = new DatanodeDescriptor[0];
-                } else {
-                    machineSets[i] = new DatanodeDescriptor[ numNodes ];
-                    numNodes = 0;
-                    for( Iterator<DatanodeDescriptor> it = 
-                         blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
-                        machineSets[i][ numNodes++ ] = it.next();
-                    }
-                    clusterMap.sortByDistance( client, machineSets[i] );
-                }
-            }
-
-            results[0] = blocks;
-            results[1] = machineSets;
+  /////////////////////////////////////////////////////////
+  //
+  // These methods are called by HadoopFS clients
+  //
+  /////////////////////////////////////////////////////////
+  /**
+   * The client wants to open the given filename.  Return a
+   * list of (block,machineArray) pairs.  The sequence of unique blocks
+   * in the list indicates all the blocks that make up the filename.
+   *
+   * The client should choose one of the machines from the machineArray
+   * at random.
+   */
+  public Object[] open(String clientMachine, UTF8 src) {
+    Object results[] = null;
+    Block blocks[] = dir.getFile(src);
+    if (blocks != null) {
+      results = new Object[2];
+      DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
+      DatanodeDescriptor client = 
+        host2DataNodeMap.getDatanodeByHost(clientMachine);
+
+      for (int i = 0; i < blocks.length; i++) {
+        int numNodes = blocksMap.numNodes( blocks[i] );
+        if ( numNodes <= 0 ) {
+          machineSets[i] = new DatanodeDescriptor[0];
+        } else {
+          machineSets[i] = new DatanodeDescriptor[ numNodes ];
+          numNodes = 0;
+          for( Iterator<DatanodeDescriptor> it = 
+                 blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
+            machineSets[i][ numNodes++ ] = it.next();
+          }
+          clusterMap.sortByDistance( client, machineSets[i] );
         }
-        return results;
+      }
+
+      results[0] = blocks;
+      results[1] = machineSets;
     }
+    return results;
+  }
 
-    /**
-     * Set replication for an existing file.
-     * 
-     * The NameNode sets new replication and schedules either replication of 
-     * under-replicated data blocks or removal of the eccessive block copies 
-     * if the blocks are over-replicated.
-     * 
-     * @see ClientProtocol#setReplication(String, short)
-     * @param src file name
-     * @param replication new replication
-     * @return true if successful; 
-     *         false if file does not exist or is a directory
-     * @author shv
-     */
-    public synchronized boolean setReplication(String src, 
-                                               short replication
-                                              ) throws IOException {
-      if( isInSafeMode() )
-        throw new SafeModeException( "Cannot set replication for " + src, safeMode );
-      verifyReplication(src, replication, null );
+  /**
+   * Set replication for an existing file.
+   * 
+   * The NameNode sets new replication and schedules either replication of 
+   * under-replicated data blocks or removal of the eccessive block copies 
+   * if the blocks are over-replicated.
+   * 
+   * @see ClientProtocol#setReplication(String, short)
+   * @param src file name
+   * @param replication new replication
+   * @return true if successful; 
+   *         false if file does not exist or is a directory
+   * @author shv
+   */
+  public synchronized boolean setReplication(String src, 
+                                             short replication
+                                             ) throws IOException {
+    if( isInSafeMode() )
+      throw new SafeModeException( "Cannot set replication for " + src, safeMode );
+    verifyReplication(src, replication, null );
 
-      Vector<Integer> oldReplication = new Vector<Integer>();
-      Block[] fileBlocks;
-      fileBlocks = dir.setReplication( src, replication, oldReplication );
-      if( fileBlocks == null )  // file not found or is a directory
-        return false;
-      int oldRepl = oldReplication.elementAt(0).intValue();
-      if( oldRepl == replication ) // the same replication
-        return true;
+    Vector<Integer> oldReplication = new Vector<Integer>();
+    Block[] fileBlocks;
+    fileBlocks = dir.setReplication( src, replication, oldReplication );
+    if( fileBlocks == null )  // file not found or is a directory
+      return false;
+    int oldRepl = oldReplication.elementAt(0).intValue();
+    if( oldRepl == replication ) // the same replication
+      return true;
 
-      // update needReplication priority queues
-      LOG.info("Increasing replication for file " + src 
-              + ". New replication is " + replication );
+    // update needReplication priority queues
+    LOG.info("Increasing replication for file " + src 
+             + ". New replication is " + replication );
+    for( int idx = 0; idx < fileBlocks.length; idx++ )
+      neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
+      
+    if( oldRepl > replication ) {  
+      // old replication > the new one; need to remove copies
+      LOG.info("Reducing replication for file " + src 
+               + ". New replication is " + replication );
       for( int idx = 0; idx < fileBlocks.length; idx++ )
-          neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
-      
-      if( oldRepl > replication ) {  
-        // old replication > the new one; need to remove copies
-        LOG.info("Reducing replication for file " + src 
-                + ". New replication is " + replication );
-        for( int idx = 0; idx < fileBlocks.length; idx++ )
-          proccessOverReplicatedBlock( fileBlocks[idx], replication );
-      }
-      return true;
+        proccessOverReplicatedBlock( fileBlocks[idx], replication );
     }
+    return true;
+  }
     
-    public long getBlockSize(String filename) throws IOException {
-      return dir.getBlockSize(filename);
-    }
+  public long getBlockSize(String filename) throws IOException {
+    return dir.getBlockSize(filename);
+  }
     
-    /**
-     * Check whether the replication parameter is within the range
-     * determined by system configuration.
-     */
-    private void verifyReplication( String src, 
-                                    short replication, 
-                                    UTF8 clientName 
+  /**
+   * Check whether the replication parameter is within the range
+   * determined by system configuration.
+   */
+  private void verifyReplication( String src, 
+                                  short replication, 
+                                  UTF8 clientName 
                                   ) throws IOException {
-      String text = "file " + src 
-              + ((clientName != null) ? " on client " + clientName : "")
-              + ".\n"
-              + "Requested replication " + replication;
-
-      if( replication > maxReplication )
-        throw new IOException( text + " exceeds maximum " + maxReplication );
-      
-      if( replication < minReplication )
-        throw new IOException(  
-            text + " is less than the required minimum " + minReplication );
-    }
+    String text = "file " + src 
+      + ((clientName != null) ? " on client " + clientName : "")
+      + ".\n"
+      + "Requested replication " + replication;
+
+    if( replication > maxReplication )
+      throw new IOException( text + " exceeds maximum " + maxReplication );
+      
+    if( replication < minReplication )
+      throw new IOException(  
+                            text + " is less than the required minimum " + minReplication );
+  }
     
-    /**
-     * The client would like to create a new block for the indicated
-     * filename.  Return an array that consists of the block, plus a set 
-     * of machines.  The first on this list should be where the client 
-     * writes data.  Subsequent items in the list must be provided in
-     * the connection to the first datanode.
-     * @return Return an array that consists of the block, plus a set
-     * of machines
-     * @throws IOException if the filename is invalid
-     *         {@link FSDirectory#isValidToCreate(UTF8)}.
-     */
-    public synchronized Object[] startFile( UTF8 src, 
-                                            UTF8 holder, 
-                                            UTF8 clientMachine, 
-                                            boolean overwrite,
-                                            short replication,
-                                            long blockSize
+  /**
+   * The client would like to create a new block for the indicated
+   * filename.  Return an array that consists of the block, plus a set 
+   * of machines.  The first on this list should be where the client 
+   * writes data.  Subsequent items in the list must be provided in
+   * the connection to the first datanode.
+   * @return Return an array that consists of the block, plus a set
+   * of machines
+   * @throws IOException if the filename is invalid
+   *         {@link FSDirectory#isValidToCreate(UTF8)}.
+   */
+  public synchronized Object[] startFile( UTF8 src, 
+                                          UTF8 holder, 
+                                          UTF8 clientMachine, 
+                                          boolean overwrite,
+                                          short replication,
+                                          long blockSize
                                           ) throws IOException {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
-            +src+" for "+holder+" at "+clientMachine);
-      if( isInSafeMode() )
-        throw new SafeModeException( "Cannot create file" + src, safeMode );
-      if (!isValidName(src.toString())) {
-        throw new IOException("Invalid file name: " + src);      	  
-      }
-      try {
-        FileUnderConstruction pendingFile = pendingCreates.get(src);
-        if (pendingFile != null) {
-          //
-          // If the file exists in pendingCreate, then it must be in our
-          // leases. Find the appropriate lease record.
-          //
-          Lease lease = leases.get(holder);
-          //
-          // We found the lease for this file. And surprisingly the original
-          // holder is trying to recreate this file. This should never occur.
-          //
-          if (lease != null) {
-            throw new AlreadyBeingCreatedException(
-                  "failed to create file " + src + " for " + holder +
-                  " on client " + clientMachine + 
-                  " because current leaseholder is trying to recreate file.");
-          }
-          //
-          // Find the original holder.
-          //
-          UTF8 oldholder = pendingFile.getClientName();
-          lease = leases.get(oldholder);
-          if (lease == null) {
-            throw new AlreadyBeingCreatedException(
-                  "failed to create file " + src + " for " + holder +
-                  " on client " + clientMachine + 
-                  " because pendingCreates is non-null but no leases found.");
-          }
-          //
-          // If the original holder has not renewed in the last SOFTLIMIT 
-          // period, then reclaim all resources and allow this request 
-          // to proceed. Otherwise, prevent this request from creating file.
-          //
-          if (lease.expiredSoftLimit()) {
-            lease.releaseLocks();
-            leases.remove(lease.holder);
-            LOG.info("Removing lease " + lease + " ");
-            if (!sortedLeases.remove(lease)) {
-              LOG.error("Unknown failure trying to remove " + lease + 
-                       " from lease set.");
-            }
-          } else  {
-            throw new AlreadyBeingCreatedException(
-                  "failed to create file " + src + " for " + holder +
-                  " on client " + clientMachine + 
-                  ", because this file is already being created by " +
-                  pendingFile.getClientName() + 
-                  " on " + pendingFile.getClientMachine());
-          }
+    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
+                                  +src+" for "+holder+" at "+clientMachine);
+    if( isInSafeMode() )
+      throw new SafeModeException( "Cannot create file" + src, safeMode );
+    if (!isValidName(src.toString())) {
+      throw new IOException("Invalid file name: " + src);      	  
+    }
+    try {
+      FileUnderConstruction pendingFile = pendingCreates.get(src);
+      if (pendingFile != null) {
+        //
+        // If the file exists in pendingCreate, then it must be in our
+        // leases. Find the appropriate lease record.
+        //
+        Lease lease = leases.get(holder);
+        //
+        // We found the lease for this file. And surprisingly the original
+        // holder is trying to recreate this file. This should never occur.
+        //
+        if (lease != null) {
+          throw new AlreadyBeingCreatedException(
+                                                 "failed to create file " + src + " for " + holder +
+                                                 " on client " + clientMachine + 
+                                                 " because current leaseholder is trying to recreate file.");
         }
-
-        try {
-           verifyReplication(src.toString(), replication, clientMachine );
-        } catch( IOException e) {
-            throw new IOException( "failed to create "+e.getMessage());
-        }
-        if (!dir.isValidToCreate(src)) {
-          if (overwrite) {
-            delete(src);
-          } else {
-            throw new IOException("failed to create file " + src 
-                    +" on client " + clientMachine
-                    +" either because the filename is invalid or the file exists");
-          }
+        //
+        // Find the original holder.
+        //
+        UTF8 oldholder = pendingFile.getClientName();
+        lease = leases.get(oldholder);
+        if (lease == null) {
+          throw new AlreadyBeingCreatedException(
+                                                 "failed to create file " + src + " for " + holder +
+                                                 " on client " + clientMachine + 
+                                                 " because pendingCreates is non-null but no leases found.");
         }
-
-        // Get the array of replication targets
-        DatanodeDescriptor clientNode = 
-          host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
-        DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
-                                                      clientNode, null, blockSize);
-        if (targets.length < this.minReplication) {
-            if (clusterMap.getNumOfLeaves() == 0) {
-              throw new IOException("Failed to create file "+src
-                    + " on client " + clientMachine
-                    + " because this cluster has no datanodes.");
-            }
-            throw new IOException("Failed to create file "+src
-                    + " on client " + clientMachine
-                    + " because there were not enough datanodes available. "
-                    + "Found " + targets.length
-                    + " datanodes but MIN_REPLICATION for the cluster is "
-                    + "configured to be "
-                    + this.minReplication
-                    + ".");
-       }
-
-        // Reserve space for this pending file
-        pendingCreates.put(src, 
-                           new FileUnderConstruction(replication, 
-                                                     blockSize,
-                                                     holder,
-                                                     clientMachine, 
-                                                     clientNode));
-        NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "
-                   +"add "+src+" to pendingCreates for "+holder );
-        synchronized (leases) {
-            Lease lease = leases.get(holder);
-            if (lease == null) {
-                lease = new Lease(holder);
-                leases.put(holder, lease);
-                sortedLeases.add(lease);
-            } else {
-                sortedLeases.remove(lease);
-                lease.renew();
-                sortedLeases.add(lease);
-            }
-            lease.startedCreate(src);
+        //
+        // If the original holder has not renewed in the last SOFTLIMIT 
+        // period, then reclaim all resources and allow this request 
+        // to proceed. Otherwise, prevent this request from creating file.
+        //
+        if (lease.expiredSoftLimit()) {
+          lease.releaseLocks();
+          leases.remove(lease.holder);
+          LOG.info("Removing lease " + lease + " ");
+          if (!sortedLeases.remove(lease)) {
+            LOG.error("Unknown failure trying to remove " + lease + 
+                      " from lease set.");
+          }
+        } else  {
+          throw new AlreadyBeingCreatedException(
+                                                 "failed to create file " + src + " for " + holder +
+                                                 " on client " + clientMachine + 
+                                                 ", because this file is already being created by " +
+                                                 pendingFile.getClientName() + 
+                                                 " on " + pendingFile.getClientMachine());
         }
-
-        // Create next block
-        Object results[] = new Object[2];
-        results[0] = allocateBlock(src);
-        results[1] = targets;
-        return results;
-      } catch (IOException ie) {
-          NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
-                  +ie.getMessage());
-        throw ie;
       }
-    }
 
-    /**
-     * The client would like to obtain an additional block for the indicated
-     * filename (which is being written-to).  Return an array that consists
-     * of the block, plus a set of machines.  The first on this list should
-     * be where the client writes data.  Subsequent items in the list must
-     * be provided in the connection to the first datanode.
-     *
-     * Make sure the previous blocks have been reported by datanodes and
-     * are replicated.  Will return an empty 2-elt array if we want the
-     * client to "try again later".
-     */
-    public synchronized Object[] getAdditionalBlock(UTF8 src, 
-                                                    UTF8 clientName
-                                                    ) throws IOException {
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
-            +src+" for "+clientName);
-        if( isInSafeMode() )
-          throw new SafeModeException( "Cannot add block to " + src, safeMode );
-        FileUnderConstruction pendingFile = pendingCreates.get(src);
-        // make sure that we still have the lease on this file
-        if (pendingFile == null) {
-          throw new LeaseExpiredException("No lease on " + src);
-        }
-        if (!pendingFile.getClientName().equals(clientName)) {
-          throw new LeaseExpiredException("Lease mismatch on " + src + 
-              " owned by " + pendingFile.getClientName() + 
-              " and appended by " + clientName);
+      try {
+        verifyReplication(src.toString(), replication, clientMachine );
+      } catch( IOException e) {
+        throw new IOException( "failed to create "+e.getMessage());
+      }
+      if (!dir.isValidToCreate(src)) {
+        if (overwrite) {
+          delete(src);
+        } else {
+          throw new IOException("failed to create file " + src 
+                                +" on client " + clientMachine
+                                +" either because the filename is invalid or the file exists");
         }
+      }
 
-        //
-        // If we fail this, bad things happen!
-        //
-        if (!checkFileProgress(pendingFile, false)) {
-          throw new NotReplicatedYetException("Not replicated yet:" + src);
+      // Get the array of replication targets
+      DatanodeDescriptor clientNode = 
+        host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
+      DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+                                                             clientNode, null, blockSize);
+      if (targets.length < this.minReplication) {
+        if (clusterMap.getNumOfLeaves() == 0) {
+          throw new IOException("Failed to create file "+src
+                                + " on client " + clientMachine
+                                + " because this cluster has no datanodes.");
+        }
+        throw new IOException("Failed to create file "+src
+                              + " on client " + clientMachine
+                              + " because there were not enough datanodes available. "
+                              + "Found " + targets.length
+                              + " datanodes but MIN_REPLICATION for the cluster is "
+                              + "configured to be "
+                              + this.minReplication
+                              + ".");
+      }
+
+      // Reserve space for this pending file
+      pendingCreates.put(src, 
+                         new FileUnderConstruction(replication, 
+                                                   blockSize,
+                                                   holder,
+                                                   clientMachine, 
+                                                   clientNode));
+      NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "
+                                     +"add "+src+" to pendingCreates for "+holder );
+      synchronized (leases) {
+        Lease lease = leases.get(holder);
+        if (lease == null) {
+          lease = new Lease(holder);
+          leases.put(holder, lease);
+          sortedLeases.add(lease);
+        } else {
+          sortedLeases.remove(lease);
+          lease.renew();
+          sortedLeases.add(lease);
         }
+        lease.startedCreate(src);
+      }
 
-        // Get the array of replication targets
-        DatanodeDescriptor clientNode = pendingFile.getClientNode();
-        DatanodeDescriptor targets[] = replicator.chooseTarget(
-            (int)(pendingFile.getReplication()),
-            clientNode,
-            null,
-            pendingFile.getBlockSize());
-        if (targets.length < this.minReplication) {
-          throw new IOException("File " + src + " could only be replicated to " +
-                                targets.length + " nodes, instead of " +
-                                minReplication);
-        }
-        
-        // Create next block
-        return new Object[]{allocateBlock(src), targets};
+      // Create next block
+      Object results[] = new Object[2];
+      results[0] = allocateBlock(src);
+      results[1] = targets;
+      return results;
+    } catch (IOException ie) {
+      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+                                   +ie.getMessage());
+      throw ie;
     }
+  }
 
-    /**
-     * The client would like to let go of the given block
-     */
-    public synchronized boolean abandonBlock(Block b, UTF8 src) {
-        //
-        // Remove the block from the pending creates list
-        //
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                +b.getBlockName()+"of file "+src );
-        FileUnderConstruction pendingFile = pendingCreates.get(src);
-        if (pendingFile != null) {
-            Collection<Block> pendingVector = pendingFile.getBlocks();
-            for (Iterator<Block> it = pendingVector.iterator(); it.hasNext(); ) {
-                Block cur = it.next();
-                if (cur.compareTo(b) == 0) {
-                    pendingCreateBlocks.remove(cur);
-                    it.remove();
-                    NameNode.stateChangeLog.debug(
-                             "BLOCK* NameSystem.abandonBlock: "
-                            +b.getBlockName()
-                            +" is removed from pendingCreateBlock and pendingCreates");
-                    return true;
-                }
-            }
+  /**
+   * The client would like to obtain an additional block for the indicated
+   * filename (which is being written-to).  Return an array that consists
+   * of the block, plus a set of machines.  The first on this list should
+   * be where the client writes data.  Subsequent items in the list must
+   * be provided in the connection to the first datanode.
+   *
+   * Make sure the previous blocks have been reported by datanodes and
+   * are replicated.  Will return an empty 2-elt array if we want the
+   * client to "try again later".
+   */
+  public synchronized Object[] getAdditionalBlock(UTF8 src, 
+                                                  UTF8 clientName
+                                                  ) throws IOException {
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
+                                  +src+" for "+clientName);
+    if( isInSafeMode() )
+      throw new SafeModeException( "Cannot add block to " + src, safeMode );
+    FileUnderConstruction pendingFile = pendingCreates.get(src);
+    // make sure that we still have the lease on this file
+    if (pendingFile == null) {
+      throw new LeaseExpiredException("No lease on " + src);
+    }
+    if (!pendingFile.getClientName().equals(clientName)) {
+      throw new LeaseExpiredException("Lease mismatch on " + src + 
+                                      " owned by " + pendingFile.getClientName() + 
+                                      " and appended by " + clientName);
+    }
+
+    //
+    // If we fail this, bad things happen!
+    //
+    if (!checkFileProgress(pendingFile, false)) {
+      throw new NotReplicatedYetException("Not replicated yet:" + src);
+    }
+
+    // Get the array of replication targets
+    DatanodeDescriptor clientNode = pendingFile.getClientNode();
+    DatanodeDescriptor targets[] = replicator.chooseTarget(
+                                                           (int)(pendingFile.getReplication()),
+                                                           clientNode,
+                                                           null,
+                                                           pendingFile.getBlockSize());
+    if (targets.length < this.minReplication) {
+      throw new IOException("File " + src + " could only be replicated to " +
+                            targets.length + " nodes, instead of " +
+                            minReplication);
+    }
+        
+    // Create next block
+    return new Object[]{allocateBlock(src), targets};
+  }
+
+  /**
+   * The client would like to let go of the given block
+   */
+  public synchronized boolean abandonBlock(Block b, UTF8 src) {
+    //
+    // Remove the block from the pending creates list
+    //
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                  +b.getBlockName()+"of file "+src );
+    FileUnderConstruction pendingFile = pendingCreates.get(src);
+    if (pendingFile != null) {
+      Collection<Block> pendingVector = pendingFile.getBlocks();
+      for (Iterator<Block> it = pendingVector.iterator(); it.hasNext(); ) {
+        Block cur = it.next();
+        if (cur.compareTo(b) == 0) {
+          pendingCreateBlocks.remove(cur);
+          it.remove();
+          NameNode.stateChangeLog.debug(
+                                        "BLOCK* NameSystem.abandonBlock: "
+                                        +b.getBlockName()
+                                        +" is removed from pendingCreateBlock and pendingCreates");
+          return true;
         }
-        return false;
+      }
     }
+    return false;
+  }
 
-    /**
-     * Abandon the entire file in progress
-     */
-    public synchronized void abandonFileInProgress(UTF8 src, 
-                                                   UTF8 holder
-                                                   ) throws IOException {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );
-      synchronized (leases) {
-        // find the lease
-        Lease lease = leases.get(holder);
-        if (lease != null) {
-          // remove the file from the lease
-          if (lease.completedCreate(src)) {
-            // if we found the file in the lease, remove it from pendingCreates
-            internalReleaseCreate(src, holder);
-          } else {
-            LOG.info("Attempt by " + holder.toString() + 
-                " to release someone else's create lock on " + 
-                src.toString());
-          }
+  /**
+   * Abandon the entire file in progress
+   */
+  public synchronized void abandonFileInProgress(UTF8 src, 
+                                                 UTF8 holder
+                                                 ) throws IOException {
+    NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );
+    synchronized (leases) {
+      // find the lease
+      Lease lease = leases.get(holder);
+      if (lease != null) {
+        // remove the file from the lease
+        if (lease.completedCreate(src)) {
+          // if we found the file in the lease, remove it from pendingCreates
+          internalReleaseCreate(src, holder);
         } else {
-          LOG.info("Attempt to release a lock from an unknown lease holder "
-              + holder.toString() + " for " + src.toString());
+          LOG.info("Attempt by " + holder.toString() + 
+                   " to release someone else's create lock on " + 
+                   src.toString());
         }
+      } else {
+        LOG.info("Attempt to release a lock from an unknown lease holder "
+                 + holder.toString() + " for " + src.toString());
       }
     }
+  }
 
-    /**
-     * Finalize the created file and make it world-accessible.  The
-     * FSNamesystem will already know the blocks that make up the file.
-     * Before we return, we make sure that all the file's blocks have 
-     * been reported by datanodes and are replicated correctly.
-     */
-    public synchronized int completeFile( UTF8 src, 
-                                          UTF8 holder) throws IOException {
-        NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
-        if( isInSafeMode() )
-          throw new SafeModeException( "Cannot complete file " + src, safeMode );
-        FileUnderConstruction pendingFile = pendingCreates.get(src);
-
-        if (dir.getFile(src) != null || pendingFile == null) {
-            NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
-                    + "failed to complete " + src
-                    + " because dir.getFile()==" + dir.getFile(src) 
-                    + " and " + pendingFile);
-            return OPERATION_FAILED;
-        } else if (! checkFileProgress(pendingFile, true)) {
-            return STILL_WAITING;
-        }
-        
-        Collection<Block> blocks = pendingFile.getBlocks();
-        int nrBlocks = blocks.size();
-        Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
+  /**
+   * Finalize the created file and make it world-accessible.  The
+   * FSNamesystem will already know the blocks that make up the file.
+   * Before we return, we make sure that all the file's blocks have 
+   * been reported by datanodes and are replicated correctly.
+   */
+  public synchronized int completeFile( UTF8 src, 
+                                        UTF8 holder) throws IOException {
+    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
+    if( isInSafeMode() )
+      throw new SafeModeException( "Cannot complete file " + src, safeMode );
+    FileUnderConstruction pendingFile = pendingCreates.get(src);
 
-        //
-        // We have the pending blocks, but they won't have
-        // length info in them (as they were allocated before
-        // data-write took place). Find the block stored in
-        // node descriptor.
-        //
-        for (int i = 0; i < nrBlocks; i++) {
-            Block b = pendingBlocks[i];
-            Block storedBlock = blocksMap.getStoredBlock( b );
-            if ( storedBlock != null ) {
-                pendingBlocks[i] = storedBlock;
-            }
-        }
-        
-        //
-        // Now we can add the (name,blocks) tuple to the filesystem
-        //
-        if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
-          return OPERATION_FAILED;
-        }
+    if (dir.getFile(src) != null || pendingFile == null) {
+      NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
+                                    + "failed to complete " + src
+                                    + " because dir.getFile()==" + dir.getFile(src) 
+                                    + " and " + pendingFile);
+      return OPERATION_FAILED;
+    } else if (! checkFileProgress(pendingFile, true)) {
+      return STILL_WAITING;
+    }
+        
+    Collection<Block> blocks = pendingFile.getBlocks();
+    int nrBlocks = blocks.size();
+    Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
 
-        // The file is no longer pending
-        pendingCreates.remove(src);
-        NameNode.stateChangeLog.debug(
-             "DIR* NameSystem.completeFile: " + src
-           + " is removed from pendingCreates");
-        for (int i = 0; i < nrBlocks; i++) {
-            pendingCreateBlocks.remove(pendingBlocks[i]);
-        }
+    //
+    // We have the pending blocks, but they won't have
+    // length info in them (as they were allocated before
+    // data-write took place). Find the block stored in
+    // node descriptor.
+    //
+    for (int i = 0; i < nrBlocks; i++) {
+      Block b = pendingBlocks[i];
+      Block storedBlock = blocksMap.getStoredBlock( b );
+      if ( storedBlock != null ) {
+        pendingBlocks[i] = storedBlock;
+      }
+    }
+        
+    //
+    // Now we can add the (name,blocks) tuple to the filesystem
+    //
+    if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
+      return OPERATION_FAILED;
+    }
 
-        synchronized (leases) {
-            Lease lease = leases.get(holder);
-            if (lease != null) {
-                lease.completedCreate(src);
-                if (! lease.hasLocks()) {
-                    leases.remove(holder);
-                    sortedLeases.remove(lease);
-                }
-            }
+    // The file is no longer pending
+    pendingCreates.remove(src);
+    NameNode.stateChangeLog.debug(
+                                  "DIR* NameSystem.completeFile: " + src
+                                  + " is removed from pendingCreates");
+    for (int i = 0; i < nrBlocks; i++) {
+      pendingCreateBlocks.remove(pendingBlocks[i]);
+    }
+
+    synchronized (leases) {
+      Lease lease = leases.get(holder);
+      if (lease != null) {
+        lease.completedCreate(src);
+        if (! lease.hasLocks()) {
+          leases.remove(holder);
+          sortedLeases.remove(lease);
         }
+      }
+    }
 
-        //
-        // REMIND - mjc - this should be done only after we wait a few secs.
-        // The namenode isn't giving datanodes enough time to report the
-        // replicated blocks that are automatically done as part of a client
-        // write.
-        //
+    //
+    // REMIND - mjc - this should be done only after we wait a few secs.
+    // The namenode isn't giving datanodes enough time to report the
+    // replicated blocks that are automatically done as part of a client
+    // write.
+    //
 
-        // Now that the file is real, we need to be sure to replicate
-        // the blocks.
-        int numExpectedReplicas = pendingFile.getReplication();
-        for (int i = 0; i < nrBlocks; i++) {
-            // filter out containingNodes that are marked for decommission.
-            int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
-            if (numCurrentReplica < numExpectedReplicas) {
-                neededReplications.add(
-                      pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
-            }
-        }
-        return COMPLETE_SUCCESS;
+    // Now that the file is real, we need to be sure to replicate
+    // the blocks.
+    int numExpectedReplicas = pendingFile.getReplication();
+    for (int i = 0; i < nrBlocks; i++) {
+      // filter out containingNodes that are marked for decommission.
+      int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
+      if (numCurrentReplica < numExpectedReplicas) {
+        neededReplications.add(
+                               pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
+      }
     }
+    return COMPLETE_SUCCESS;
+  }
 
-    static Random randBlockId = new Random();
+  static Random randBlockId = new Random();
     
-    /**
-     * Allocate a block at the given pending filename
-     */
-    synchronized Block allocateBlock(UTF8 src) {
-        Block b = null;
-        do {
-            b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
-        } while ( isValidBlock(b) );
-        FileUnderConstruction v = pendingCreates.get(src);
-        v.getBlocks().add(b);
-        pendingCreateBlocks.add(b);
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: "
-            +src+ ". "+b.getBlockName()+
-            " is created and added to pendingCreates and pendingCreateBlocks" );      
-        return b;
-    }
+  /**
+   * Allocate a block at the given pending filename
+   */
+  synchronized Block allocateBlock(UTF8 src) {
+    Block b = null;
+    do {
+      b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
+    } while ( isValidBlock(b) );
+    FileUnderConstruction v = pendingCreates.get(src);
+    v.getBlocks().add(b);
+    pendingCreateBlocks.add(b);
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: "
+                                  +src+ ". "+b.getBlockName()+
+                                  " is created and added to pendingCreates and pendingCreateBlocks" );      
+    return b;
+  }
 
-    /**
-     * Check that the indicated file's blocks are present and
-     * replicated.  If not, return false. If checkall is true, then check
-     * all blocks, otherwise check only penultimate block.
-     */
-    synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
-        if (checkall) {
-          //
-          // check all blocks of the file.
-          //
-          for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
-            if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
-                return false;
-            }
-          }
-        } else {
-          //
-          // check the penultimate block of this file
-          //
-          Block b = v.getPenultimateBlock();
-          if (b != null) {
-            if (blocksMap.numNodes(b) < this.minReplication) {
-                return false;
-            }
-          }
+  /**
+   * Check that the indicated file's blocks are present and
+   * replicated.  If not, return false. If checkall is true, then check
+   * all blocks, otherwise check only penultimate block.
+   */
+  synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
+    if (checkall) {
+      //
+      // check all blocks of the file.
+      //
+      for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
+        if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
+          return false;
         }
-        return true;
+      }
+    } else {
+      //
+      // check the penultimate block of this file
+      //
+      Block b = v.getPenultimateBlock();
+      if (b != null) {
+        if (blocksMap.numNodes(b) < this.minReplication) {
+          return false;
+        }
+      }
     }
+    return true;
+  }
 
-    /**
-     * Adds block to list of blocks which will be invalidated on 
-     * specified datanode.
-     */
-    private void addToInvalidates(Block b, DatanodeInfo n) {
-      Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
-      if (invalidateSet == null) {
-        invalidateSet = new ArrayList<Block>();
-        recentInvalidateSets.put(n.getStorageID(), invalidateSet);
-      }
-      invalidateSet.add(b);
+  /**
+   * Adds block to list of blocks which will be invalidated on 
+   * specified datanode.
+   */
+  private void addToInvalidates(Block b, DatanodeInfo n) {
+    Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
+    if (invalidateSet == null) {
+      invalidateSet = new ArrayList<Block>();
+      recentInvalidateSets.put(n.getStorageID(), invalidateSet);
     }
+    invalidateSet.add(b);
+  }
 
-    /**
-     * dumps the contents of recentInvalidateSets
-     */
-    private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
-      Collection<Collection<Block>> values = recentInvalidateSets.values();
-      Iterator<Map.Entry<String,Collection<Block>>> it = 
-                                recentInvalidateSets.entrySet().iterator();
-      if (values.size() == 0) {
-        out.println("Metasave: Blocks waiting deletion: 0");
-        return;
-      }
-      out.println("Metasave: Blocks waiting deletion from " +
-                   values.size() + " datanodes.");
-      while (it.hasNext()) {
-        Map.Entry<String,Collection<Block>> entry = it.next();
-        String storageId = (String) entry.getKey();
-        DatanodeDescriptor node = datanodeMap.get(storageId);
-        Collection<Block> blklist = entry.getValue();
-        if (blklist.size() > 0) {
-          out.print(node.getName());
-          for (Iterator jt = blklist.iterator(); jt.hasNext();) {
-            Block block = (Block) jt.next();
-            out.print(" " + block); 
-          }
-          out.println("");
+  /**
+   * dumps the contents of recentInvalidateSets
+   */
+  private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
+    Collection<Collection<Block>> values = recentInvalidateSets.values();
+    Iterator<Map.Entry<String,Collection<Block>>> it = 
+      recentInvalidateSets.entrySet().iterator();
+    if (values.size() == 0) {
+      out.println("Metasave: Blocks waiting deletion: 0");
+      return;
+    }
+    out.println("Metasave: Blocks waiting deletion from " +
+                values.size() + " datanodes.");
+    while (it.hasNext()) {
+      Map.Entry<String,Collection<Block>> entry = it.next();
+      String storageId = (String) entry.getKey();
+      DatanodeDescriptor node = datanodeMap.get(storageId);
+      Collection<Block> blklist = entry.getValue();
+      if (blklist.size() > 0) {
+        out.print(node.getName());
+        for (Iterator jt = blklist.iterator(); jt.hasNext();) {
+          Block block = (Block) jt.next();
+          out.print(" " + block); 
         }
+        out.println("");
       }
     }
+  }
 
-    /**
-     * Invalidates the given block on the given datanode.
-     */
-    public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
-        throws IOException {
-      NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
-                                    + blk.getBlockName() + " on " 
-                                    + dn.getName());
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
-      }
-
-      // Check how many copies we have of the block.  If we have at least one
-      // copy on a live node, then we can delete it. 
-      int count = countContainingNodes( blk );
-      if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() || 
-                                              dn.isDecommissioned() ))) {
-          addToInvalidates(blk, dn);
-          removeStoredBlock(blk, getDatanode(dn));
-          NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-                                        + blk.getBlockName() + " on " 
-                                        + dn.getName() + " listed for deletion.");
-      } else {
-          NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-                                        + blk.getBlockName() + " on " 
-                                        + dn.getName() + " is the only copy and was not deleted.");
-      }
+  /**
+   * Invalidates the given block on the given datanode.
+   */
+  public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
+    throws IOException {
+    NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
+                                 + blk.getBlockName() + " on " 
+                                 + dn.getName());
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+    }
+
+    // Check how many copies we have of the block.  If we have at least one
+    // copy on a live node, then we can delete it. 
+    int count = countContainingNodes( blk );
+    if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() || 
+                                            dn.isDecommissioned() ))) {
+      addToInvalidates(blk, dn);
+      removeStoredBlock(blk, getDatanode(dn));
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+                                   + blk.getBlockName() + " on " 

[... 5658 lines stripped ...]