You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC

svn commit: r529410 [7/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon Apr 16 14:44:35 2007
@@ -39,722 +39,722 @@
  *************************************************/
 class FSDirectory implements FSConstants {
 
-    /******************************************************
-     * We keep an in-memory representation of the file/block
-     * hierarchy.
-     * 
-     * TODO: Factor out INode to a standalone class.
-     ******************************************************/
-    class INode {
-        private String name;
-        private INode parent;
-        private TreeMap<String, INode> children = null;
-        private Block blocks[];
-        private short blockReplication;
-
-        /**
-         */
-        INode(String name, Block blocks[], short replication) {
-            this.name = name;
-            this.parent = null;
-            this.blocks = blocks;
-            this.blockReplication = replication;
-        }
-
-        /**
-         */
-        INode(String name) {
-            this.name = name;
-            this.parent = null;
-            this.blocks = null;
-            this.blockReplication = 0;
-        }
-
-        /**
-         * Check whether it's a directory
-         */
-        synchronized public boolean isDir() {
-          return (blocks == null);
-        }
-        
-        /**
-         * Get block replication for the file 
-         * @return block replication
-         */
-        public short getReplication() {
-          return this.blockReplication;
-        }
-        
-        /**
-         * Get local file name
-         * @return local file name
-         */
-        String getLocalName() {
-          return name;
-        }
-
-        /**
-         * Get file blocks 
-         * @return file blocks
-         */
-        Block[] getBlocks() {
-          return this.blocks;
-        }
-        
-        /**
-         * Get parent directory 
-         * @return parent INode
-         */
-        INode getParent() {
-          return this.parent;
-        }
-
-        /**
-         * Get children iterator
-         * @return Iterator of children
-         */
-        Iterator<INode> getChildIterator() {
-          return ( children != null ) ?  children.values().iterator() : null;
-            // instead of null, we could return a static empty iterator.
-        }
-        
-        void addChild(String name, INode node) {
-          if ( children == null ) {
-            children = new TreeMap<String, INode>();
-          }
-          children.put(name, node);
-        }
+  /******************************************************
+   * We keep an in-memory representation of the file/block
+   * hierarchy.
+   * 
+   * TODO: Factor out INode to a standalone class.
+   ******************************************************/
+  class INode {
+    private String name;
+    private INode parent;
+    private TreeMap<String, INode> children = null;
+    private Block blocks[];
+    private short blockReplication;
 
-        /**
-         * This is the external interface
-         */
-        INode getNode(String target) {
-            if ( target == null || 
-                ! target.startsWith("/") || target.length() == 0) {
-                return null;
-            } else if (parent == null && "/".equals(target)) {
-                return this;
-            } else {
-                Vector components = new Vector();
-                int start = 0;
-                int slashid = 0;
-                while (start < target.length() && (slashid = target.indexOf('/', start)) >= 0) {
-                    components.add(target.substring(start, slashid));
-                    start = slashid + 1;
-                }
-                if (start < target.length()) {
-                    components.add(target.substring(start));
-                }
-                return getNode(components, 0);
-            }
-        }
-
-        /**
-         */
-        INode getNode(Vector components, int index) {
-            if (! name.equals((String) components.elementAt(index))) {
-                return null;
-            }
-            if (index == components.size()-1) {
-                return this;
-            }
-
-            // Check with children
-            INode child = this.getChild((String)components.elementAt(index+1));
-            if (child == null) {
-                return null;
-            } else {
-                return child.getNode(components, index+1);
-            }
-        }
-        
-        INode getChild( String name) {
-          return (children == null) ? null : children.get( name );
-        }
-
-        /**
-         * Add new INode to the file tree.
-         * Find the parent and insert 
-         * 
-         * @param path file path
-         * @param newNode INode to be added
-         * @return null if the node already exists; inserted INode, otherwise
-         * @throws FileNotFoundException 
-         * @author shv
-         */
-        INode addNode(String path, INode newNode) throws FileNotFoundException {
-          File target = new File( path );
-          // find parent
-          Path parent = new Path(path).getParent();
-          if (parent == null) { // add root
-              return null;
-          }
-          INode parentNode = getNode(parent.toString());
-          if (parentNode == null) {
-              throw new FileNotFoundException(
-                      "Parent path does not exist: "+path);
-          }
-          if (!parentNode.isDir()) {
-        	  throw new FileNotFoundException(
-        			  "Parent path is not a directory: "+path);
-          }
-           // check whether the parent already has a node with that name
-          String name = newNode.name = target.getName();
-          if( parentNode.getChild( name ) != null ) {
-            return null;
-          }
-          // insert into the parent children list
-          parentNode.addChild(name, newNode);
-          newNode.parent = parentNode;
-          return newNode;
-        }
-
-        /**
-         */
-        boolean removeNode() {
-            if (parent == null) {
-                return false;
-            } else {
-                parent.children.remove(name);
-                return true;
-            }
-        }
-          
-        /**
-         * Collect all the blocks at this INode and all its children.
-         * This operation is performed after a node is removed from the tree,
-         * and we want to GC all the blocks at this node and below.
-         */
-        void collectSubtreeBlocks(Vector v) {
-            if (blocks != null) {
-                for (int i = 0; i < blocks.length; i++) {
-                    v.add(blocks[i]);
-                }
-            }
-            incrDeletedFileCount();
-            for (Iterator<INode> it = getChildIterator(); it != null &&
-                                                          it.hasNext(); ) {
-                it.next().collectSubtreeBlocks(v);
-            }
-        }
-
-        /**
-         */
-        int numItemsInTree() {
-            int total = 0;
-            for (Iterator<INode> it = getChildIterator(); it != null && 
-                                                          it.hasNext(); ) {
-                total += it.next().numItemsInTree();
-            }
-            return total + 1;
-        }
-
-        /**
-         */
-        String computeName() {
-            if (parent != null) {
-                return parent.computeName() + "/" + name;
-            } else {
-                return name;
-            }
-        }
-
-        /**
-         */
-        long computeFileLength() {
-            long total = 0;
-            if (blocks != null) {
-                for (int i = 0; i < blocks.length; i++) {
-                    total += blocks[i].getNumBytes();
-                }
-            }
-            return total;
-        }
-
-        /**
-         */
-        long computeContentsLength() {
-            long total = computeFileLength();
-            for (Iterator<INode> it = getChildIterator(); it != null && 
-                                                          it.hasNext(); ) {
-                total += it.next().computeContentsLength();
-            }
-            return total;
-        }
-
-        /**
-         * Get the block size of the first block
-         * @return the number of bytes
-         */
-        public long getBlockSize() {
-          if (blocks == null || blocks.length == 0) {
-            return 0;
-          } else {
-            return blocks[0].getNumBytes();
-          }
-        }
-        
-        /**
-         */
-        void listContents(Vector v) {
-            if (parent != null && blocks != null) {
-                v.add(this);
-            }
-
-            for (Iterator<INode> it = getChildIterator(); it != null && 
-                                                          it.hasNext(); ) {
-                v.add(it.next());
-            }
-        }
+    /**
+     */
+    INode(String name, Block blocks[], short replication) {
+      this.name = name;
+      this.parent = null;
+      this.blocks = blocks;
+      this.blockReplication = replication;
     }
 
-    FSNamesystem namesystem = null;
-    INode rootDir = new INode("");
-    TreeMap activeLocks = new TreeMap();
-    FSImage fsImage;  
-    boolean ready = false;
-    // Metrics record
-    private MetricsRecord directoryMetrics = null;
-    
-    /** Access an existing dfs name directory. */
-    public FSDirectory(FSNamesystem ns) throws IOException {
-      this.fsImage = new FSImage();
-      namesystem = ns;
-      initialize();
+    /**
+     */
+    INode(String name) {
+      this.name = name;
+      this.parent = null;
+      this.blocks = null;
+      this.blockReplication = 0;
     }
 
-    public FSDirectory(FSImage fsImage, FSNamesystem ns) throws IOException {
-      this.fsImage = fsImage;
-      namesystem = ns;
-      initialize();
+    /**
+     * Check whether it's a directory
+     */
+    synchronized public boolean isDir() {
+      return (blocks == null);
     }
-    
-    private void initialize() {
-      MetricsContext metricsContext = MetricsUtil.getContext("dfs");
-      directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
+        
+    /**
+     * Get block replication for the file 
+     * @return block replication
+     */
+    public short getReplication() {
+      return this.blockReplication;
     }
-
-    void loadFSImage( Collection<File> dataDirs,
-                      StartupOption startOpt ) throws IOException {
-      // format before starting up if requested
-      if( startOpt == StartupOption.FORMAT ) {
-        fsImage.setStorageDirectories( dataDirs );
-        fsImage.format();
-        startOpt = StartupOption.REGULAR;
-      }
-      try {
-        fsImage.recoverTransitionRead( dataDirs, startOpt );
-      } catch( IOException e ) {
-        fsImage.close();
-        throw e;
-      }
-      synchronized (this) {
-        this.ready = true;
-        this.notifyAll();
-      }
+        
+    /**
+     * Get local file name
+     * @return local file name
+     */
+    String getLocalName() {
+      return name;
     }
 
-    private void incrDeletedFileCount() {
-        directoryMetrics.incrMetric("files_deleted", 1);
-        directoryMetrics.update();
+    /**
+     * Get file blocks 
+     * @return file blocks
+     */
+    Block[] getBlocks() {
+      return this.blocks;
     }
-    
+        
     /**
-     * Shutdown the filestore
+     * Get parent directory 
+     * @return parent INode
      */
-    public void close() throws IOException {
-        fsImage.close();
+    INode getParent() {
+      return this.parent;
     }
 
     /**
-     * Block until the object is ready to be used.
+     * Get children iterator
+     * @return Iterator of children
      */
-    void waitForReady() {
-        if (! ready) {
-            synchronized (this) {
-                while (!ready) {
-                    try {
-                        this.wait(5000);
-                    } catch (InterruptedException ie) {
-                    }
-                }
-            }
-        }
+    Iterator<INode> getChildIterator() {
+      return ( children != null ) ?  children.values().iterator() : null;
+      // instead of null, we could return a static empty iterator.
+    }
+        
+    void addChild(String name, INode node) {
+      if ( children == null ) {
+        children = new TreeMap<String, INode>();
+      }
+      children.put(name, node);
     }
 
     /**
-     * Add the given filename to the fs.
+     * This is the external interface
      */
-    public boolean addFile(UTF8 path, Block[] blocks, short replication) {
-        waitForReady();
-
-        // Always do an implicit mkdirs for parent directory tree
-        String pathString = path.toString();
-        if( ! mkdirs(new Path(pathString).getParent().toString()) ) {
-           return false;
+    INode getNode(String target) {
+      if ( target == null || 
+           ! target.startsWith("/") || target.length() == 0) {
+        return null;
+      } else if (parent == null && "/".equals(target)) {
+        return this;
+      } else {
+        Vector components = new Vector();
+        int start = 0;
+        int slashid = 0;
+        while (start < target.length() && (slashid = target.indexOf('/', start)) >= 0) {
+          components.add(target.substring(start, slashid));
+          start = slashid + 1;
         }
-        INode newNode = new INode( new File(pathString).getName(), blocks, replication);
-        if( ! unprotectedAddFile(path, newNode) ) {
-           NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
-                    +"failed to add "+path+" with "
-                    +blocks.length+" blocks to the file system" );
-           return false;
+        if (start < target.length()) {
+          components.add(target.substring(start));
         }
-        // add create file record to log
-        fsImage.getEditLog().logCreateFile( newNode );
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
-                +path+" with "+blocks.length+" blocks is added to the file system" );
-        return true;
+        return getNode(components, 0);
+      }
     }
-    
+
     /**
      */
-    boolean unprotectedAddFile(UTF8 path, INode newNode) {
-      synchronized (rootDir) {
-         try {
-            if( rootDir.addNode(path.toString(), newNode ) != null ) {
-                int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
-                // Add file->block mapping
-                for (int i = 0; i < nrBlocks; i++)
-                    namesystem.blocksMap.addINode(newNode.blocks[i], newNode);
-                return true;
-            } else {
-                return false;
-            }
-        } catch (FileNotFoundException e ) {
-            return false;
-        }
+    INode getNode(Vector components, int index) {
+      if (! name.equals((String) components.elementAt(index))) {
+        return null;
+      }
+      if (index == components.size()-1) {
+        return this;
+      }
+
+      // Check with children
+      INode child = this.getChild((String)components.elementAt(index+1));
+      if (child == null) {
+        return null;
+      } else {
+        return child.getNode(components, index+1);
       }
     }
-    
-    boolean unprotectedAddFile(UTF8 path, Block[] blocks, short replication ) {
-      return unprotectedAddFile( path,  
-                    new INode( path.toString(), blocks, replication ));
+        
+    INode getChild( String name) {
+      return (children == null) ? null : children.get( name );
     }
 
     /**
-     * Change the filename
-     */
-    public boolean renameTo(UTF8 src, UTF8 dst) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
-          +src+" to "+dst );
-      waitForReady();
-      if( ! unprotectedRenameTo(src, dst) )
-        return false;
-      fsImage.getEditLog().logRename(src, dst);
-      return true;
+     * Add new INode to the file tree.
+     * Find the parent and insert 
+     * 
+     * @param path file path
+     * @param newNode INode to be added
+     * @return null if the node already exists; inserted INode, otherwise
+     * @throws FileNotFoundException 
+     * @author shv
+     */
+    INode addNode(String path, INode newNode) throws FileNotFoundException {
+      File target = new File( path );
+      // find parent
+      Path parent = new Path(path).getParent();
+      if (parent == null) { // add root
+        return null;
+      }
+      INode parentNode = getNode(parent.toString());
+      if (parentNode == null) {
+        throw new FileNotFoundException(
+                                        "Parent path does not exist: "+path);
+      }
+      if (!parentNode.isDir()) {
+        throw new FileNotFoundException(
+                                        "Parent path is not a directory: "+path);
+      }
+      // check whether the parent already has a node with that name
+      String name = newNode.name = target.getName();
+      if( parentNode.getChild( name ) != null ) {
+        return null;
+      }
+      // insert into the parent children list
+      parentNode.addChild(name, newNode);
+      newNode.parent = parentNode;
+      return newNode;
     }
 
     /**
      */
-    boolean unprotectedRenameTo(UTF8 src, UTF8 dst) {
-        synchronized(rootDir) {
-          String srcStr = src.toString();
-          String dstStr = dst.toString();
-            INode renamedNode = rootDir.getNode(srcStr);
-            if (renamedNode == null) {
-                NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                        +"failed to rename "+src+" to "+dst+ " because source does not exist" );
-                return false;
-            }
-            if (isDir(dst)) {
-              dstStr += "/" + new File(srcStr).getName();
-            }
-            if( rootDir.getNode(dstStr.toString()) != null ) {
-                NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                        +"failed to rename "+src+" to "+dstStr+ " because destination exists" );
-                return false;
-            }
-            renamedNode.removeNode();
-            
-            // the renamed node can be reused now
-            try {
-                if( rootDir.addNode(dstStr, renamedNode ) != null ) {
-                    NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
-                        +src+" is renamed to "+dst );
-                    return true;
-                }
-            } catch (FileNotFoundException e ) {
-                NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                        +"failed to rename "+src+" to "+dst );
-                try {
-                    rootDir.addNode(srcStr, renamedNode); // put it back
-                }catch(FileNotFoundException e2) {                
-                }
-            }
-
-            return false;
-        }
+    boolean removeNode() {
+      if (parent == null) {
+        return false;
+      } else {
+        parent.children.remove(name);
+        return true;
+      }
     }
-
+          
     /**
-     * Set file replication
-     * 
-     * @param src file name
-     * @param replication new replication
-     * @param oldReplication old replication - output parameter
-     * @return array of file blocks
-     * @throws IOException
-     */
-    Block[] setReplication( String src, 
-                            short replication,
-                            Vector oldReplication
-                           ) throws IOException {
-      waitForReady();
-      Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication );
-      if( fileBlocks != null )  // log replication change
-        fsImage.getEditLog().logSetReplication( src, replication );
-      return fileBlocks;
-    }
-
-    Block[] unprotectedSetReplication(  String src, 
-                                        short replication,
-                                        Vector oldReplication
-                                      ) throws IOException {
-      if( oldReplication == null )
-        oldReplication = new Vector();
-      oldReplication.setSize(1);
-      oldReplication.set( 0, new Integer(-1) );
-      Block[] fileBlocks = null;
-      synchronized(rootDir) {
-        INode fileNode = rootDir.getNode(src);
-        if (fileNode == null)
-          return null;
-        if( fileNode.isDir() )
-          return null;
-        oldReplication.set( 0, new Integer( fileNode.blockReplication ));
-        fileNode.blockReplication = replication;
-        fileBlocks = fileNode.blocks;
+     * Collect all the blocks at this INode and all its children.
+     * This operation is performed after a node is removed from the tree,
+     * and we want to GC all the blocks at this node and below.
+     */
+    void collectSubtreeBlocks(Vector v) {
+      if (blocks != null) {
+        for (int i = 0; i < blocks.length; i++) {
+          v.add(blocks[i]);
+        }
+      }
+      incrDeletedFileCount();
+      for (Iterator<INode> it = getChildIterator(); it != null &&
+             it.hasNext(); ) {
+        it.next().collectSubtreeBlocks(v);
       }
-      return fileBlocks;
     }
 
     /**
-     * Get the blocksize of a file
-     * @param filename the filename
-     * @return the number of bytes in the first block
-     * @throws IOException if it is a directory or does not exist.
      */
-    public long getBlockSize(String filename) throws IOException {
-      synchronized (rootDir) {
-        INode fileNode = rootDir.getNode(filename);
-        if (fileNode == null) {
-          throw new IOException("Unknown file: " + filename);
-        }
-        if (fileNode.isDir()) {
-          throw new IOException("Getting block size of a directory: " + 
-                                filename);
-        }
-        return fileNode.getBlockSize();
+    int numItemsInTree() {
+      int total = 0;
+      for (Iterator<INode> it = getChildIterator(); it != null && 
+             it.hasNext(); ) {
+        total += it.next().numItemsInTree();
       }
+      return total + 1;
     }
-    
+
     /**
-     * Remove the file from management, return blocks
      */
-    public Block[] delete(UTF8 src) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "
-                +src );
-        waitForReady();
-        Block[] blocks = unprotectedDelete(src); 
-        if( blocks != null )
-          fsImage.getEditLog().logDelete( src );
-        return blocks;
+    String computeName() {
+      if (parent != null) {
+        return parent.computeName() + "/" + name;
+      } else {
+        return name;
+      }
     }
 
     /**
      */
-    Block[] unprotectedDelete(UTF8 src) {
-        synchronized (rootDir) {
-            INode targetNode = rootDir.getNode(src.toString());
-            if (targetNode == null) {
-                NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
-                        +"failed to remove "+src+" because it does not exist" );
-                return null;
-            } else {
-                //
-                // Remove the node from the namespace and GC all
-                // the blocks underneath the node.
-                //
-                if (! targetNode.removeNode()) {
-                    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
-                            +"failed to remove "+src+" because it does not have a parent" );
-                    return null;
-                } else {
-                    NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-                            +src+" is removed" );
-                    Vector v = new Vector();
-                    targetNode.collectSubtreeBlocks(v);
-                    for (Iterator it = v.iterator(); it.hasNext(); ) {
-                        Block b = (Block) it.next();
-                        namesystem.blocksMap.removeINode(b);
-                    }
-                    return (Block[]) v.toArray(new Block[v.size()]);
-                }
-            }
+    long computeFileLength() {
+      long total = 0;
+      if (blocks != null) {
+        for (int i = 0; i < blocks.length; i++) {
+          total += blocks[i].getNumBytes();
         }
+      }
+      return total;
     }
 
     /**
      */
-    public int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {
-        TreeSet holders = (TreeSet) activeLocks.get(src);
-        if (holders == null) {
-            holders = new TreeSet();
-            activeLocks.put(src, holders);
-        }
-        if (exclusive && holders.size() > 0) {
-            return STILL_WAITING;
-        } else {
-            holders.add(holder);
-            return COMPLETE_SUCCESS;
-        }
+    long computeContentsLength() {
+      long total = computeFileLength();
+      for (Iterator<INode> it = getChildIterator(); it != null && 
+             it.hasNext(); ) {
+        total += it.next().computeContentsLength();
+      }
+      return total;
     }
 
     /**
+     * Get the block size of the first block
+     * @return the number of bytes
      */
-    public int releaseLock(UTF8 src, UTF8 holder) {
-        TreeSet holders = (TreeSet) activeLocks.get(src);
-        if (holders != null && holders.contains(holder)) {
-            holders.remove(holder);
-            if (holders.size() == 0) {
-                activeLocks.remove(src);
-            }
-            return COMPLETE_SUCCESS;
-        } else {
-            return OPERATION_FAILED;
-        }
+    public long getBlockSize() {
+      if (blocks == null || blocks.length == 0) {
+        return 0;
+      } else {
+        return blocks[0].getNumBytes();
+      }
     }
-
+        
     /**
-     * Get a listing of files given path 'src'
-     *
-     * This function is admittedly very inefficient right now.  We'll
-     * make it better later.
      */
-    public DFSFileInfo[] getListing(UTF8 src) {
-        String srcs = normalizePath(src);
+    void listContents(Vector v) {
+      if (parent != null && blocks != null) {
+        v.add(this);
+      }
 
-        synchronized (rootDir) {
-            INode targetNode = rootDir.getNode(srcs);
-            if (targetNode == null) {
-                return null;
-            } else {
-                Vector contents = new Vector();
-                targetNode.listContents(contents);
+      for (Iterator<INode> it = getChildIterator(); it != null && 
+             it.hasNext(); ) {
+        v.add(it.next());
+      }
+    }
+  }
 
-                DFSFileInfo listing[] = new DFSFileInfo[contents.size()];
-                int i = 0;
-                for (Iterator it = contents.iterator(); it.hasNext(); i++) {
-                    listing[i] = new DFSFileInfo( (INode) it.next() );
-                }
-                return listing;
-            }
+  FSNamesystem namesystem = null;
+  INode rootDir = new INode("");
+  TreeMap activeLocks = new TreeMap();
+  FSImage fsImage;  
+  boolean ready = false;
+  // Metrics record
+  private MetricsRecord directoryMetrics = null;
+    
+  /** Access an existing dfs name directory. */
+  public FSDirectory(FSNamesystem ns) throws IOException {
+    this.fsImage = new FSImage();
+    namesystem = ns;
+    initialize();
+  }
+
+  public FSDirectory(FSImage fsImage, FSNamesystem ns) throws IOException {
+    this.fsImage = fsImage;
+    namesystem = ns;
+    initialize();
+  }
+    
+  private void initialize() {
+    MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+    directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
+  }
+
+  void loadFSImage( Collection<File> dataDirs,
+                    StartupOption startOpt ) throws IOException {
+    // format before starting up if requested
+    if( startOpt == StartupOption.FORMAT ) {
+      fsImage.setStorageDirectories( dataDirs );
+      fsImage.format();
+      startOpt = StartupOption.REGULAR;
+    }
+    try {
+      fsImage.recoverTransitionRead( dataDirs, startOpt );
+    } catch( IOException e ) {
+      fsImage.close();
+      throw e;
+    }
+    synchronized (this) {
+      this.ready = true;
+      this.notifyAll();
+    }
+  }
+
+  private void incrDeletedFileCount() {
+    directoryMetrics.incrMetric("files_deleted", 1);
+    directoryMetrics.update();
+  }
+    
+  /**
+   * Shutdown the filestore
+   */
+  public void close() throws IOException {
+    fsImage.close();
+  }
+
+  /**
+   * Block until the object is ready to be used.
+   */
+  void waitForReady() {
+    if (! ready) {
+      synchronized (this) {
+        while (!ready) {
+          try {
+            this.wait(5000);
+          } catch (InterruptedException ie) {
+          }
         }
+      }
     }
+  }
 
-    /**
-     * Get the blocks associated with the file
-     */
-    public Block[] getFile(UTF8 src) {
-        waitForReady();
-        synchronized (rootDir) {
-            INode targetNode = rootDir.getNode(src.toString());
-            if (targetNode == null) {
-                return null;
-            } else {
-                return targetNode.blocks;
-            }
+  /**
+   * Add the given filename to the fs.
+   */
+  public boolean addFile(UTF8 path, Block[] blocks, short replication) {
+    waitForReady();
+
+    // Always do an implicit mkdirs for parent directory tree
+    String pathString = path.toString();
+    if( ! mkdirs(new Path(pathString).getParent().toString()) ) {
+      return false;
+    }
+    INode newNode = new INode( new File(pathString).getName(), blocks, replication);
+    if( ! unprotectedAddFile(path, newNode) ) {
+      NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
+                                   +"failed to add "+path+" with "
+                                   +blocks.length+" blocks to the file system" );
+      return false;
+    }
+    // add create file record to log
+    fsImage.getEditLog().logCreateFile( newNode );
+    NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+                                  +path+" with "+blocks.length+" blocks is added to the file system" );
+    return true;
+  }
+    
+  /**
+   */
+  boolean unprotectedAddFile(UTF8 path, INode newNode) {
+    synchronized (rootDir) {
+      try {
+        if( rootDir.addNode(path.toString(), newNode ) != null ) {
+          int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
+          // Add file->block mapping
+          for (int i = 0; i < nrBlocks; i++)
+            namesystem.blocksMap.addINode(newNode.blocks[i], newNode);
+          return true;
+        } else {
+          return false;
         }
+      } catch (FileNotFoundException e ) {
+        return false;
+      }
     }
-
-    /** 
-     * Check whether the filepath could be created
-     */
-    public boolean isValidToCreate(UTF8 src) {
-        String srcs = normalizePath(src);
-        synchronized (rootDir) {
-            if (srcs.startsWith("/") && 
-                ! srcs.endsWith("/") && 
-                rootDir.getNode(srcs) == null) {
-                return true;
-            } else {
-                return false;
-            }
+  }
+    
+  boolean unprotectedAddFile(UTF8 path, Block[] blocks, short replication ) {
+    return unprotectedAddFile( path,  
+                               new INode( path.toString(), blocks, replication ));
+  }
+
+  /**
+   * Change the filename
+   */
+  public boolean renameTo(UTF8 src, UTF8 dst) {
+    NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
+                                  +src+" to "+dst );
+    waitForReady();
+    if( ! unprotectedRenameTo(src, dst) )
+      return false;
+    fsImage.getEditLog().logRename(src, dst);
+    return true;
+  }
+
+  /**
+   */
+  boolean unprotectedRenameTo(UTF8 src, UTF8 dst) {
+    synchronized(rootDir) {
+      String srcStr = src.toString();
+      String dstStr = dst.toString();
+      INode renamedNode = rootDir.getNode(srcStr);
+      if (renamedNode == null) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+                                     +"failed to rename "+src+" to "+dst+ " because source does not exist" );
+        return false;
+      }
+      if (isDir(dst)) {
+        dstStr += "/" + new File(srcStr).getName();
+      }
+      if( rootDir.getNode(dstStr.toString()) != null ) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+                                     +"failed to rename "+src+" to "+dstStr+ " because destination exists" );
+        return false;
+      }
+      renamedNode.removeNode();
+            
+      // the renamed node can be reused now
+      try {
+        if( rootDir.addNode(dstStr, renamedNode ) != null ) {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+                                        +src+" is renamed to "+dst );
+          return true;
+        }
+      } catch (FileNotFoundException e ) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+                                     +"failed to rename "+src+" to "+dst );
+        try {
+          rootDir.addNode(srcStr, renamedNode); // put it back
+        }catch(FileNotFoundException e2) {                
         }
+      }
+
+      return false;
     }
+  }
 
-    /**
-     * Check whether the path specifies a directory
-     */
-    public boolean isDir(UTF8 src) {
-        synchronized (rootDir) {
-            INode node = rootDir.getNode(normalizePath(src));
-            return node != null && node.isDir();
+  /**
+   * Set file replication
+   * 
+   * @param src file name
+   * @param replication new replication
+   * @param oldReplication old replication - output parameter
+   * @return array of file blocks
+   * @throws IOException
+   */
+  Block[] setReplication( String src, 
+                          short replication,
+                          Vector oldReplication
+                          ) throws IOException {
+    waitForReady();
+    Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication );
+    if( fileBlocks != null )  // log replication change
+      fsImage.getEditLog().logSetReplication( src, replication );
+    return fileBlocks;
+  }
+
+  Block[] unprotectedSetReplication(  String src, 
+                                      short replication,
+                                      Vector oldReplication
+                                      ) throws IOException {
+    if( oldReplication == null )
+      oldReplication = new Vector();
+    oldReplication.setSize(1);
+    oldReplication.set( 0, new Integer(-1) );
+    Block[] fileBlocks = null;
+    synchronized(rootDir) {
+      INode fileNode = rootDir.getNode(src);
+      if (fileNode == null)
+        return null;
+      if( fileNode.isDir() )
+        return null;
+      oldReplication.set( 0, new Integer( fileNode.blockReplication ));
+      fileNode.blockReplication = replication;
+      fileBlocks = fileNode.blocks;
+    }
+    return fileBlocks;
+  }
+
+  /**
+   * Get the blocksize of a file
+   * @param filename the filename
+   * @return the number of bytes in the first block
+   * @throws IOException if it is a directory or does not exist.
+   */
+  public long getBlockSize(String filename) throws IOException {
+    synchronized (rootDir) {
+      INode fileNode = rootDir.getNode(filename);
+      if (fileNode == null) {
+        throw new IOException("Unknown file: " + filename);
+      }
+      if (fileNode.isDir()) {
+        throw new IOException("Getting block size of a directory: " + 
+                              filename);
+      }
+      return fileNode.getBlockSize();
+    }
+  }
+    
+  /**
+   * Remove the file from management, return blocks
+   */
+  public Block[] delete(UTF8 src) {
+    NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "
+                                  +src );
+    waitForReady();
+    Block[] blocks = unprotectedDelete(src); 
+    if( blocks != null )
+      fsImage.getEditLog().logDelete( src );
+    return blocks;
+  }
+
+  /**
+   */
+  Block[] unprotectedDelete(UTF8 src) {
+    synchronized (rootDir) {
+      INode targetNode = rootDir.getNode(src.toString());
+      if (targetNode == null) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
+                                     +"failed to remove "+src+" because it does not exist" );
+        return null;
+      } else {
+        //
+        // Remove the node from the namespace and GC all
+        // the blocks underneath the node.
+        //
+        if (! targetNode.removeNode()) {
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
+                                       +"failed to remove "+src+" because it does not have a parent" );
+          return null;
+        } else {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+                                        +src+" is removed" );
+          Vector v = new Vector();
+          targetNode.collectSubtreeBlocks(v);
+          for (Iterator it = v.iterator(); it.hasNext(); ) {
+            Block b = (Block) it.next();
+            namesystem.blocksMap.removeINode(b);
+          }
+          return (Block[]) v.toArray(new Block[v.size()]);
         }
+      }
     }
+  }
 
-    /**
-     * Create directory entries for every item
-     */
-    boolean mkdirs(String src) {
-        src = normalizePath(new UTF8(src));
-
-        // Use this to collect all the dirs we need to construct
-        Vector v = new Vector();
-
-        // The dir itself
-        v.add(src);
-
-        // All its parents
-        Path parent = new Path(src).getParent();
-        while (parent != null) {
-            v.add(parent.toString());
-            parent = parent.getParent();
+  /**
+   */
+  public int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {
+    TreeSet holders = (TreeSet) activeLocks.get(src);
+    if (holders == null) {
+      holders = new TreeSet();
+      activeLocks.put(src, holders);
+    }
+    if (exclusive && holders.size() > 0) {
+      return STILL_WAITING;
+    } else {
+      holders.add(holder);
+      return COMPLETE_SUCCESS;
+    }
+  }
+
+  /**
+   */
+  public int releaseLock(UTF8 src, UTF8 holder) {
+    TreeSet holders = (TreeSet) activeLocks.get(src);
+    if (holders != null && holders.contains(holder)) {
+      holders.remove(holder);
+      if (holders.size() == 0) {
+        activeLocks.remove(src);
+      }
+      return COMPLETE_SUCCESS;
+    } else {
+      return OPERATION_FAILED;
+    }
+  }
+
+  /**
+   * Get a listing of files given path 'src'
+   *
+   * This function is admittedly very inefficient right now.  We'll
+   * make it better later.
+   */
+  public DFSFileInfo[] getListing(UTF8 src) {
+    String srcs = normalizePath(src);
+
+    synchronized (rootDir) {
+      INode targetNode = rootDir.getNode(srcs);
+      if (targetNode == null) {
+        return null;
+      } else {
+        Vector contents = new Vector();
+        targetNode.listContents(contents);
+
+        DFSFileInfo listing[] = new DFSFileInfo[contents.size()];
+        int i = 0;
+        for (Iterator it = contents.iterator(); it.hasNext(); i++) {
+          listing[i] = new DFSFileInfo( (INode) it.next() );
         }
+        return listing;
+      }
+    }
+  }
 
-        // Now go backwards through list of dirs, creating along
-        // the way
-        int numElts = v.size();
-        for (int i = numElts - 1; i >= 0; i--) {
-            String cur = (String) v.elementAt(i);
-            try {
-               INode inserted = unprotectedMkdir(cur);
-               if (inserted != null) {
-                   NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
-                        +"created directory "+cur );
-                   fsImage.getEditLog().logMkDir( inserted );
-               } else { // otherwise cur exists, verify that it is a directory
-                 if (!isDir(new UTF8(cur))) {
-                   NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
-                        +"path " + cur + " is not a directory ");
-                   return false;
-                 } 
-               }
-            } catch (FileNotFoundException e ) {
-                NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
-                        +"failed to create directory "+src);
-                return false;
-            }
-        }
+  /**
+   * Get the blocks associated with the file
+   */
+  public Block[] getFile(UTF8 src) {
+    waitForReady();
+    synchronized (rootDir) {
+      INode targetNode = rootDir.getNode(src.toString());
+      if (targetNode == null) {
+        return null;
+      } else {
+        return targetNode.blocks;
+      }
+    }
+  }
+
+  /** 
+   * Check whether the filepath could be created
+   */
+  public boolean isValidToCreate(UTF8 src) {
+    String srcs = normalizePath(src);
+    synchronized (rootDir) {
+      if (srcs.startsWith("/") && 
+          ! srcs.endsWith("/") && 
+          rootDir.getNode(srcs) == null) {
         return true;
+      } else {
+        return false;
+      }
     }
+  }
 
-    /**
-     */
-    INode unprotectedMkdir(String src) throws FileNotFoundException {
-        synchronized (rootDir) {
-            return rootDir.addNode(src, new INode(new File(src).getName()));
+  /**
+   * Check whether the path specifies a directory
+   */
+  public boolean isDir(UTF8 src) {
+    synchronized (rootDir) {
+      INode node = rootDir.getNode(normalizePath(src));
+      return node != null && node.isDir();
+    }
+  }
+
+  /**
+   * Create directory entries for every item
+   */
+  boolean mkdirs(String src) {
+    src = normalizePath(new UTF8(src));
+
+    // Use this to collect all the dirs we need to construct
+    Vector v = new Vector();
+
+    // The dir itself
+    v.add(src);
+
+    // All its parents
+    Path parent = new Path(src).getParent();
+    while (parent != null) {
+      v.add(parent.toString());
+      parent = parent.getParent();
+    }
+
+    // Now go backwards through list of dirs, creating along
+    // the way
+    int numElts = v.size();
+    for (int i = numElts - 1; i >= 0; i--) {
+      String cur = (String) v.elementAt(i);
+      try {
+        INode inserted = unprotectedMkdir(cur);
+        if (inserted != null) {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
+                                        +"created directory "+cur );
+          fsImage.getEditLog().logMkDir( inserted );
+        } else { // otherwise cur exists, verify that it is a directory
+          if (!isDir(new UTF8(cur))) {
+            NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
+                                          +"path " + cur + " is not a directory ");
+            return false;
+          } 
         }
+      } catch (FileNotFoundException e ) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
+                                      +"failed to create directory "+src);
+        return false;
+      }
     }
+    return true;
+  }
 
-    /**
-     */
-    String normalizePath(UTF8 src) {
-        String srcs = src.toString();
-        if (srcs.length() > 1 && srcs.endsWith("/")) {
-            srcs = srcs.substring(0, srcs.length() - 1);
-        }
-        return srcs;
+  /**
+   */
+  INode unprotectedMkdir(String src) throws FileNotFoundException {
+    synchronized (rootDir) {
+      return rootDir.addNode(src, new INode(new File(src).getName()));
+    }
+  }
+
+  /**
+   */
+  String normalizePath(UTF8 src) {
+    String srcs = src.toString();
+    if (srcs.length() > 1 && srcs.endsWith("/")) {
+      srcs = srcs.substring(0, srcs.length() - 1);
     }
+    return srcs;
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Mon Apr 16 14:44:35 2007
@@ -155,19 +155,19 @@
    * remain, then raise an exception that will possibly cause the
    * server to exit
    */
-   void processIOError(int index) throws IOException {
-     if (editStreams == null || editStreams.size() == 1) {
-       throw new IOException("Checkpoint directories inaccessible.");
-     }
-     assert(index < getNumStorageDirs());
-     assert(getNumStorageDirs() == editStreams.size());
-
-     editStreams.remove( index );
-     //
-     // Invoke the ioerror routine of the fsimage
-     //
-     fsimage.processIOError(index);
-   }
+  void processIOError(int index) throws IOException {
+    if (editStreams == null || editStreams.size() == 1) {
+      throw new IOException("Checkpoint directories inaccessible.");
+    }
+    assert(index < getNumStorageDirs());
+    assert(getNumStorageDirs() == editStreams.size());
+
+    editStreams.remove( index );
+    //
+    // Invoke the ioerror routine of the fsimage
+    //
+    fsimage.processIOError(index);
+  }
 
   /**
    * check if ANY edits.new log exists
@@ -194,8 +194,8 @@
     
     if (edits != null) {
       DataInputStream in = new DataInputStream(
-          new BufferedInputStream(
-              new FileInputStream(edits)));
+                                               new BufferedInputStream(
+                                                                       new FileInputStream(edits)));
       // Read log file version. Could be missing. 
       in.mark( 4 );
       // If edits log is greater than 2G, available method will return negative
@@ -214,10 +214,10 @@
           logVersion = in.readInt();
         if( logVersion < FSConstants.LAYOUT_VERSION ) // future version
           throw new IOException(
-              "Unexpected version of the file system log file: "
-              + logVersion
-              + ". Current version = " 
-              + FSConstants.LAYOUT_VERSION + "." );
+                                "Unexpected version of the file system log file: "
+                                + logVersion
+                                + ". Current version = " 
+                                + FSConstants.LAYOUT_VERSION + "." );
       }
       
       short replication = fsNamesys.getDefaultReplication();
@@ -245,10 +245,10 @@
               writables = aw.get(); 
               if( writables.length != 2 )
                 throw new IOException("Incorrect data fortmat. " 
-                    + "Name & replication pair expected");
+                                      + "Name & replication pair expected");
               name = (UTF8) writables[0];
               replication = Short.parseShort(
-                  ((UTF8)writables[1]).toString());
+                                             ((UTF8)writables[1]).toString());
               replication = adjustReplication( replication );
             }
             // get blocks
@@ -268,8 +268,8 @@
             repl.readFields(in);
             replication = adjustReplication( fromLogReplication(repl) );
             fsDir.unprotectedSetReplication(src.toString(), 
-                replication,
-                null);
+                                            replication,
+                                            null);
             break;
           } 
           case OP_RENAME: {
@@ -295,7 +295,7 @@
           case OP_DATANODE_ADD: {
             if( logVersion > -3 )
               throw new IOException("Unexpected opcode " + opcode 
-                  + " for version " + logVersion );
+                                    + " for version " + logVersion );
             FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
             nodeimage.readFields(in);
             DatanodeDescriptor node = nodeimage.getDatanodeDescriptor();
@@ -305,7 +305,7 @@
           case OP_DATANODE_REMOVE: {
             if( logVersion > -3 )
               throw new IOException("Unexpected opcode " + opcode 
-                  + " for version " + logVersion );
+                                    + " for version " + logVersion );
             DatanodeID nodeID = new DatanodeID();
             nodeID.readFields(in);
             DatanodeDescriptor node = fsNamesys.getDatanode( nodeID );
@@ -379,8 +379,8 @@
    */
   void logCreateFile( FSDirectory.INode newNode ) {
     UTF8 nameReplicationPair[] = new UTF8[] { 
-                        new UTF8( newNode.computeName() ), 
-                        FSEditLog.toLogReplication( newNode.getReplication() )};
+      new UTF8( newNode.computeName() ), 
+      FSEditLog.toLogReplication( newNode.getReplication() )};
     logEdit(OP_ADD,
             new ArrayWritable( UTF8.class, nameReplicationPair ), 
             new ArrayWritable( Block.class, newNode.getBlocks() ));
@@ -524,6 +524,6 @@
    * Return the name of the edit file
    */
   File getFsEditName() throws IOException {
-      return getEditFile( 0 );
+    return getEditFile( 0 );
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Mon Apr 16 14:44:35 2007
@@ -131,14 +131,14 @@
    */
   void recoverTransitionRead( Collection<File> dataDirs,
                               StartupOption startOpt
-                            ) throws IOException {
+                              ) throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
     this.storageDirs = new ArrayList<StorageDirectory>( dataDirs.size() );
     AbstractList<StorageState> dataDirStates = 
-                                new ArrayList<StorageState>( dataDirs.size() );
+      new ArrayList<StorageState>( dataDirs.size() );
     boolean isFormatted = false;
     for( Iterator<File> it = dataDirs.iterator(); it.hasNext(); ) {
       File dataDir = it.next();
@@ -151,7 +151,7 @@
         case NON_EXISTENT:
           // name-node fails if any of the configured storage dirs are missing
           throw new InconsistentFSStateException( sd.root,
-              "storage directory does not exist or is not accessible." );
+                                                  "storage directory does not exist or is not accessible." );
         case NOT_FORMATTED:
           break;
         case CONVERT:
@@ -179,16 +179,16 @@
 
     if( dataDirs.size() == 0 )  // none of the data dirs exist
       throw new IOException( 
-          "All specified directories are not accessible or do not exist." );
+                            "All specified directories are not accessible or do not exist." );
     if( ! isFormatted && startOpt != StartupOption.ROLLBACK )
       throw new IOException( "NameNode is not formatted." );
     if( startOpt != StartupOption.UPGRADE
         && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
         && layoutVersion != FSConstants.LAYOUT_VERSION )
       throw new IOException( 
-          "\nFile system image contains an old layout version " + layoutVersion
-          + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
-          + " is required.\nPlease restart NameNode with -upgrade option." );
+                            "\nFile system image contains an old layout version " + layoutVersion
+                            + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
+                            + " is required.\nPlease restart NameNode with -upgrade option." );
 
     // 2. Format unformatted dirs.
     this.checkpointTime = 0L;
@@ -233,8 +233,8 @@
       StorageDirectory sd = getStorageDir( idx );
       if( sd.getPreviousDir().exists() )
         throw new InconsistentFSStateException( sd.root,
-          "previous fs state should not exist during upgrade. "
-            + "Finalize or rollback first." );
+                                                "previous fs state should not exist during upgrade. "
+                                                + "Finalize or rollback first." );
     }
 
     // load the latest image
@@ -249,10 +249,10 @@
     for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
       StorageDirectory sd = getStorageDir( idx );
       LOG.info( "Upgrading image directory " + sd.root 
-              + ".\n   old LV = " + oldLV
-              + "; old CTime = " + oldCTime
-              + ".\n   new LV = " + this.getLayoutVersion()
-              + "; new CTime = " + this.getCTime() );
+                + ".\n   old LV = " + oldLV
+                + "; old CTime = " + oldCTime
+                + ".\n   new LV = " + this.getLayoutVersion()
+                + "; new CTime = " + this.getCTime() );
       File curDir = sd.getCurrentDir();
       File prevDir = sd.getPreviousDir();
       File tmpDir = sd.getPreviousTmp();
@@ -288,7 +288,7 @@
       File prevDir = sd.getPreviousDir();
       if( ! prevDir.exists() ) {  // use current directory then
         LOG.info( "Storage directory " + sd.root
-                + " does not contain previous fs state." );
+                  + " does not contain previous fs state." );
         sd.read(); // read and verify consistency with other directories
         continue;
       }
@@ -298,7 +298,7 @@
     }
     if( ! canRollback )
       throw new IOException( "Cannot rollback. " 
-            + "None of the storage directories contain previous fs state." );
+                             + "None of the storage directories contain previous fs state." );
 
     // Now that we know all directories are going to be consistent
     // Do rollback for each directory containing previous state
@@ -309,8 +309,8 @@
         continue;
 
       LOG.info( "Rolling back storage directory " + sd.root 
-          + ".\n   new LV = " + prevState.getLayoutVersion()
-          + "; new CTime = " + prevState.getCTime() );
+                + ".\n   new LV = " + prevState.getLayoutVersion()
+                + "; new CTime = " + prevState.getCTime() );
       File tmpDir = sd.getRemovedTmp();
       assert ! tmpDir.exists() : "removed.tmp directory must not exist.";
       // rename current to tmp
@@ -332,9 +332,9 @@
     if( ! prevDir.exists() )
       return; // already discarded
     LOG.info( "Finalizing upgrade for storage directory " 
-            + sd.root 
-            + ".\n   cur LV = " + this.getLayoutVersion()
-            + "; cur CTime = " + this.getCTime() );
+              + sd.root 
+              + ".\n   cur LV = " + this.getLayoutVersion()
+              + "; cur CTime = " + this.getCTime() );
     assert sd.getCurrentDir().exists() : "Current directory must exist.";
     final File tmpDir = sd.getFinalizedTmp();
     // rename previous to tmp and remove
@@ -355,7 +355,7 @@
 
   protected void getFields( Properties props, 
                             StorageDirectory sd 
-                          ) throws IOException {
+                            ) throws IOException {
     super.getFields( props, sd );
     if( layoutVersion == 0 )
       throw new IOException("NameNode directory " 
@@ -389,7 +389,7 @@
    */
   protected void setFields( Properties props, 
                             StorageDirectory sd 
-                          ) throws IOException {
+                            ) throws IOException {
     super.setFields( props, sd );
     writeCheckpointTime( sd );
   }
@@ -406,7 +406,7 @@
     File timeFile = getImageFile( sd, NameNodeFile.TIME );
     if (timeFile.exists()) { timeFile.delete(); }
     DataOutputStream out = new DataOutputStream(
-          new FileOutputStream(timeFile));
+                                                new FileOutputStream(timeFile));
     try {
       out.writeLong( checkpointTime );
     } finally {
@@ -439,10 +439,10 @@
     // check consistency of the old storage
     if( ! oldImageDir.isDirectory() )
       throw new InconsistentFSStateException( sd.root,
-          oldImageDir + " is not a directory." );
+                                              oldImageDir + " is not a directory." );
     if( ! oldImageDir.canWrite() )
       throw new InconsistentFSStateException( sd.root,
-          oldImageDir + " is not writable." );
+                                              oldImageDir + " is not writable." );
     return true;
   }
   
@@ -454,8 +454,8 @@
     File oldImage = new File( oldImageDir, "fsimage" );
     
     LOG.info( "Old layout version directory " + oldImageDir
-            + " is found. New layout version is "
-            + FSConstants.LAYOUT_VERSION );
+              + " is found. New layout version is "
+              + FSConstants.LAYOUT_VERSION );
     LOG.info( "Trying to convert ..." );
 
     // we did not use locking for the pre upgrade layout, so we cannot prevent 
@@ -603,8 +603,8 @@
     boolean needToSave = true;
     int imgVersion = this.getLayoutVersion();
     DataInputStream in = new DataInputStream(
-                            new BufferedInputStream(
-                                new FileInputStream(curFile)));
+                                             new BufferedInputStream(
+                                                                     new FileInputStream(curFile)));
     try {
       /*
        * TODO we need to change format of the image file
@@ -685,8 +685,8 @@
     // Write out data
     //
     DataOutputStream out = new DataOutputStream(
-          new BufferedOutputStream(
-          new FileOutputStream(newFile)));
+                                                new BufferedOutputStream(
+                                                                         new FileOutputStream(newFile)));
     try {
       out.writeInt(FSConstants.LAYOUT_VERSION);
       out.writeInt(namespaceID);
@@ -745,7 +745,7 @@
       sd.unlock();
     }
     LOG.info( "Storage directory " + sd.root 
-        + " has been successfully formatted." );
+              + " has been successfully formatted." );
   }
 
   public void format() throws IOException {
@@ -780,7 +780,7 @@
       }
     }
     for(Iterator<INode> it = root.getChildIterator(); it != null &&
-                                                      it.hasNext(); ) {
+          it.hasNext(); ) {
       saveImage( fullName, it.next(), out );
     }
   }