You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/12/03 22:39:41 UTC

svn commit: r1042017 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/

Author: hairong
Date: Fri Dec  3 21:39:40 2010
New Revision: 1042017

URL: http://svn.apache.org/viewvc?rev=1042017&view=rev
Log:
HDFS-1506. Refactor fsimage loading code. Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Dec  3 21:39:40 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
     HDFS-1518. Wrong description in FSNamesystem's javadoc. 
     (Jingguo Yao via eli)
 
+    HDFS-1506. Refactor fsimage loading code. (hairong)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java Fri Dec  3 21:39:40 2010
@@ -40,6 +40,15 @@ class BlockInfo extends Block implements
    */
   private Object[] triplets;
 
+  /**
+   * Construct an entry for blocksmap
+   * @param replication the block's replication factor
+   */
+  protected BlockInfo(int replication) {
+    this.triplets = new Object[3*replication];
+    this.inode = null;
+  }
+  
   protected BlockInfo(Block blk, int replication) {
     super(blk);
     this.triplets = new Object[3*replication];

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Dec  3 21:39:40 2010
@@ -287,39 +287,9 @@ class FSDirectory implements Closeable {
 
   }
 
-  INodeDirectory addToParent( byte[][] src,
-                              INodeDirectory parentINode,
-                              PermissionStatus permissions,
-                              Block[] blocks, 
-                              String symlink,
-                              short replication,
-                              long modificationTime,
-                              long atime,
-                              long nsQuota,
-                              long dsQuota,
-                              long preferredBlockSize,
-                              boolean propagateModTime) 
-                              throws UnresolvedLinkException {
+  INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
+      INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
     // NOTE: This does not update space counts for parents
-    // create new inode
-    INode newNode;
-    if (blocks == null) {
-      if (nsQuota >= 0 || dsQuota >= 0) {
-        newNode = new INodeDirectoryWithQuota(
-            permissions, modificationTime, nsQuota, dsQuota);
-      } else {
-        newNode = new INodeDirectory(permissions, modificationTime);
-      }
-    } else  {
-      if (symlink.length() != 0) {
-        newNode = new INodeSymlink(symlink, modificationTime, atime, permissions);
-        ((INodeSymlink)newNode).setLinkValue(symlink);
-      } else {
-        newNode = new INodeFile(permissions, blocks.length, replication,
-                                modificationTime, atime, preferredBlockSize);
-      }
-    }
-    // add new node to the parent
     INodeDirectory newParent = null;
     writeLock();
     try {
@@ -332,14 +302,12 @@ class FSDirectory implements Closeable {
       }
       if(newParent == null)
         return null;
-      if(blocks != null) {
-        int nrBlocks = blocks.length;
+      if(!newNode.isDirectory() && !newNode.isLink()) {
         // Add file->block mapping
-        assert !newNode.isLink();
         INodeFile newF = (INodeFile)newNode;
-        for (int i = 0; i < nrBlocks; i++) {
-          BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
-          newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
+        BlockInfo[] blocks = newF.getBlocks();
+        for (int i = 0; i < blocks.length; i++) {
+          newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
         }
       }
     } finally {
@@ -1302,6 +1270,22 @@ class FSDirectory implements Closeable {
     }
   }
   
+  /**
+   * Get the parent node of path.
+   * 
+   * @param path the path to explore
+   * @return its parent node
+   */
+  INodeDirectory getParent(byte[][] path) 
+    throws FileNotFoundException, UnresolvedLinkException {
+    readLock();
+    try {
+      return rootDir.getParent(path);
+    } finally {
+      readUnlock();
+    }
+  }
+  
   /** 
    * Check whether the filepath could be created
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Dec  3 21:39:40 2010
@@ -1049,8 +1049,9 @@ public class FSImage extends Storage {
    * "re-save" and consolidate the edit-logs
    */
   boolean loadFSImage(File curFile) throws IOException {
-    FSImageFormat.Loader loader = new FSImageFormat.Loader(conf);
-    loader.load(curFile, getFSNamesystem());
+    FSImageFormat.Loader loader = new FSImageFormat.Loader(
+        conf, getFSNamesystem());
+    loader.load(curFile);
 
 
     // Check that the image digest we loaded matches up with what

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Dec  3 21:39:40 2010
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.MD5Hash;
@@ -55,6 +54,8 @@ public abstract class FSImageFormat {
    */
   public static class Loader {
     private final Configuration conf;
+    /** which namesystem this loader is working for */
+    private final FSNamesystem namesystem;
 
     /** Set to true once a file has been loaded using this loader. */
     private boolean loaded = false;
@@ -66,8 +67,9 @@ public abstract class FSImageFormat {
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
 
-    public Loader(Configuration conf) {
+    public Loader(Configuration conf, FSNamesystem namesystem) {
       this.conf = conf;
+      this.namesystem = namesystem;
     }
 
     /**
@@ -115,14 +117,13 @@ public abstract class FSImageFormat {
       }
     }
 
-    void load(File curFile, FSNamesystem targetNamesystem)
+    void load(File curFile)
       throws IOException
     {
       checkNotLoaded();
       assert curFile != null : "curFile is null";
 
       long startTime = now();
-      FSDirectory fsDir = targetNamesystem.dir;
 
       //
       // Load in bits
@@ -155,7 +156,7 @@ public abstract class FSImageFormat {
         // read in the last generation stamp.
         if (imgVersion <= -12) {
           long genstamp = in.readLong();
-          targetNamesystem.setGenerationStamp(genstamp); 
+          namesystem.setGenerationStamp(genstamp); 
         }
 
         // read compression related info
@@ -169,109 +170,17 @@ public abstract class FSImageFormat {
 
         LOG.info("Loading image file " + curFile + " using " + compression);
 
-
-        // read file info
-        short replication = targetNamesystem.getDefaultReplication();
-
+        // load all inodes
         LOG.info("Number of files = " + numFiles);
-
-        byte[][] pathComponents;
-        byte[][] parentPath = {{}};
-        INodeDirectory parentINode = fsDir.rootDir;
-        for (long i = 0; i < numFiles; i++) {
-          long modificationTime = 0;
-          long atime = 0;
-          long blockSize = 0;
-          pathComponents = FSImageSerialization.readPathComponents(in);
-          replication = in.readShort();
-          replication = targetNamesystem.adjustReplication(replication);
-          modificationTime = in.readLong();
-          if (imgVersion <= -17) {
-            atime = in.readLong();
-          }
-          if (imgVersion <= -8) {
-            blockSize = in.readLong();
-          }
-          int numBlocks = in.readInt();
-          Block blocks[] = null;
-
-          // for older versions, a blocklist of size 0
-          // indicates a directory.
-          if ((-9 <= imgVersion && numBlocks > 0) ||
-              (imgVersion < -9 && numBlocks >= 0)) {
-            blocks = new Block[numBlocks];
-            for (int j = 0; j < numBlocks; j++) {
-              blocks[j] = new Block();
-              if (-14 < imgVersion) {
-                blocks[j].set(in.readLong(), in.readLong(), 
-                              GenerationStamp.GRANDFATHER_GENERATION_STAMP);
-              } else {
-                blocks[j].readFields(in);
-              }
-            }
-          }
-          // Older versions of HDFS does not store the block size in inode.
-          // If the file has more than one block, use the size of the 
-          // first block as the blocksize. Otherwise use the default block size.
-          //
-          if (-8 <= imgVersion && blockSize == 0) {
-            if (numBlocks > 1) {
-              blockSize = blocks[0].getNumBytes();
-            } else {
-              long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
-              blockSize = Math.max(targetNamesystem.getDefaultBlockSize(), first);
-            }
-          }
-          
-          // get quota only when the node is a directory
-          long nsQuota = -1L;
-          if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
-            nsQuota = in.readLong();
-          }
-          long dsQuota = -1L;
-          if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
-            dsQuota = in.readLong();
-          }
-
-          // Read the symlink only when the node is a symlink
-          String symlink = "";
-          if (imgVersion <= -23 && numBlocks == -2) {
-            symlink = Text.readString(in);
-          }
-          
-          PermissionStatus permissions = targetNamesystem.getUpgradePermission();
-          if (imgVersion <= -11) {
-            permissions = PermissionStatus.read(in);
-          }
-          
-          if (isRoot(pathComponents)) { // it is the root
-            // update the root's attributes
-            if (nsQuota != -1 || dsQuota != -1) {
-              fsDir.rootDir.setQuota(nsQuota, dsQuota);
-            }
-            fsDir.rootDir.setModificationTime(modificationTime);
-            fsDir.rootDir.setPermissionStatus(permissions);
-            continue;
-          }
-          // check if the new inode belongs to the same parent
-          if(!isParent(pathComponents, parentPath)) {
-            parentINode = null;
-            parentPath = getParent(pathComponents);
-          }
-          // add new inode
-          // without propagating modification time to parent
-          parentINode = fsDir.addToParent(pathComponents, parentINode, permissions,
-                                          blocks, symlink, replication, modificationTime, 
-                                          atime, nsQuota, dsQuota, blockSize, false);
-        }
+        loadFullNameINodes(numFiles, in);
 
         // load datanode info
         this.loadDatanodes(in);
 
         // load Files Under Construction
-        this.loadFilesUnderConstruction(in, targetNamesystem);
+        this.loadFilesUnderConstruction(in);
 
-        this.loadSecretManagerState(in, targetNamesystem);
+        this.loadSecretManagerState(in);
 
         // make sure to read to the end of file
         int eof = in.read();
@@ -287,6 +196,128 @@ public abstract class FSImageFormat {
           + (now() - startTime)/1000 + " seconds.");
     }
 
+  /** Update the root node's attributes */
+  private void updateRootAttr(INode root) {                                                           
+    long nsQuota = root.getNsQuota();
+    long dsQuota = root.getDsQuota();
+    FSDirectory fsDir = namesystem.dir;
+    if (nsQuota != -1 || dsQuota != -1) {
+      fsDir.rootDir.setQuota(nsQuota, dsQuota);
+    }
+    fsDir.rootDir.setModificationTime(root.getModificationTime());
+    fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());    
+  }
+
+  /**
+   * load fsimage files assuming full path names are stored
+   * 
+   * @param numFiles total number of files to load
+   * @param in data input stream
+   * @throws IOException if any error occurs
+   */
+  private void loadFullNameINodes(long numFiles,
+      DataInputStream in) throws IOException {
+    byte[][] pathComponents;
+    byte[][] parentPath = {{}};      
+    FSDirectory fsDir = namesystem.dir;
+    INodeDirectory parentINode = fsDir.rootDir;
+    for (long i = 0; i < numFiles; i++) {
+      pathComponents = FSImageSerialization.readPathComponents(in);
+      INode newNode = loadINode(in);
+
+      if (isRoot(pathComponents)) { // it is the root
+        // update the root's attributes
+        updateRootAttr(newNode);
+        continue;
+      }
+      // check if the new inode belongs to the same parent
+      if(!isParent(pathComponents, parentPath)) {
+        parentINode = fsDir.getParent(pathComponents);
+        parentPath = getParent(pathComponents);
+      }
+
+      // add new inode
+      parentINode = fsDir.addToParent(pathComponents[pathComponents.length-1], 
+          parentINode, newNode, false);
+    }
+  }
+
+  /**
+   * load an inode from fsimage except for its name
+   * 
+   * @param in data input stream from which image is read
+   * @return an inode
+   */
+  private INode loadINode(DataInputStream in)
+  throws IOException {
+    long modificationTime = 0;
+    long atime = 0;
+    long blockSize = 0;
+    
+    short replication = in.readShort();
+    replication = namesystem.adjustReplication(replication);
+    modificationTime = in.readLong();
+    if (imgVersion <= -17) {
+      atime = in.readLong();
+    }
+    if (imgVersion <= -8) {
+      blockSize = in.readLong();
+    }
+    int numBlocks = in.readInt();
+    BlockInfo blocks[] = null;
+
+    // for older versions, a blocklist of size 0
+    // indicates a directory.
+    if ((-9 <= imgVersion && numBlocks > 0) ||
+        (imgVersion < -9 && numBlocks >= 0)) {
+      blocks = new BlockInfo[numBlocks];
+      for (int j = 0; j < numBlocks; j++) {
+        blocks[j] = new BlockInfo(replication);
+        if (-14 < imgVersion) {
+          blocks[j].set(in.readLong(), in.readLong(), 
+                        GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+        } else {
+          blocks[j].readFields(in);
+        }
+      }
+    }
+    // Older versions of HDFS does not store the block size in inode.
+    // If the file has more than one block, use the size of the 
+    // first block as the blocksize. Otherwise use the default block size.
+    //
+    if (-8 <= imgVersion && blockSize == 0) {
+      if (numBlocks > 1) {
+        blockSize = blocks[0].getNumBytes();
+      } else {
+        long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
+        blockSize = Math.max(namesystem.getDefaultBlockSize(), first);
+      }
+    }
+    
+    // get quota only when the node is a directory
+    long nsQuota = -1L;
+    if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
+        nsQuota = in.readLong();
+      }
+      long dsQuota = -1L;
+      if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
+        dsQuota = in.readLong();
+      }
+  
+      // Read the symlink only when the node is a symlink
+      String symlink = "";
+      if (imgVersion <= -23 && numBlocks == -2) {
+        symlink = Text.readString(in);
+      }
+      
+      PermissionStatus permissions = namesystem.getUpgradePermission();
+      if (imgVersion <= -11) {
+        permissions = PermissionStatus.read(in);
+      }
+  
+      return INode.newINode(permissions, blocks, symlink, replication,
+          modificationTime, atime, nsQuota, dsQuota, blockSize);
+    }
 
     private void loadDatanodes(DataInputStream in) throws IOException {
       if (imgVersion > -3) // pre datanode image version
@@ -301,9 +332,9 @@ public abstract class FSImageFormat {
       }
     }
 
-    private void loadFilesUnderConstruction(DataInputStream in, 
-        FSNamesystem fs) throws IOException {
-      FSDirectory fsDir = fs.dir;
+    private void loadFilesUnderConstruction(DataInputStream in)
+    throws IOException {
+      FSDirectory fsDir = namesystem.dir;
       if (imgVersion > -13) // pre lease image version
         return;
       int size = in.readInt();
@@ -325,18 +356,17 @@ public abstract class FSImageFormat {
         }
         INodeFile oldnode = (INodeFile) old;
         fsDir.replaceNode(path, oldnode, cons);
-        fs.leaseManager.addLease(cons.getClientName(), path); 
+        namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }
 
-    private void loadSecretManagerState(DataInputStream in, 
-        FSNamesystem fs) throws IOException {
+    private void loadSecretManagerState(DataInputStream in) throws IOException {
       if (imgVersion > -23) {
         //SecretManagerState is not available.
         //This must not happen if security is turned on.
         return; 
       }
-      fs.loadSecretManagerState(in);
+      namesystem.loadSecretManagerState(in);
     }
 
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Dec  3 21:39:40 2010
@@ -429,4 +429,44 @@ abstract class INode implements Comparab
     }
     return len1 - len2;
   }
+  
+  /**
+   * Create an INode; the inode's name is not set yet
+   * 
+   * @param permissions permissions
+   * @param blocks blocks if a file
+   * @param symlink symblic link if a symbolic link
+   * @param replication replication factor
+   * @param modificationTime modification time
+   * @param atime access time
+   * @param nsQuota namespace quota
+   * @param dsQuota disk quota
+   * @param preferredBlockSize block size
+   * @return an inode
+   */
+  static INode newINode(PermissionStatus permissions,
+                        BlockInfo[] blocks,
+                        String symlink,
+                        short replication,
+                        long modificationTime,
+                        long atime,
+                        long nsQuota,
+                        long dsQuota,
+                        long preferredBlockSize) {
+    if (blocks == null) {
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        return new INodeDirectoryWithQuota(
+            permissions, modificationTime, nsQuota, dsQuota);
+      } 
+      // regular directory
+      return new INodeDirectory(permissions, modificationTime);
+    }
+    // check if symbolic link
+    if (symlink.length() != 0) {
+      return new INodeSymlink(symlink, modificationTime, atime, permissions);
+    } 
+    // file
+    return new INodeFile(permissions, blocks, replication,
+        modificationTime, atime, preferredBlockSize);
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1042017&r1=1042016&r2=1042017&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Dec  3 21:39:40 2010
@@ -320,7 +320,7 @@ class INodeDirectory extends INode {
   <T extends INode> T addNode(String path, T newNode, boolean inheritPermission
       ) throws FileNotFoundException, UnresolvedLinkException  {
     byte[][] pathComponents = getPathComponents(path);        
-    if(addToParent(pathComponents, newNode, null,
+    if(addToParent(pathComponents, newNode,
                     inheritPermission, true) == null)
       return null;
     return newNode;
@@ -335,35 +335,62 @@ class INodeDirectory extends INode {
    * @throws  FileNotFoundException if parent does not exist or 
    *          is not a directory.
    */
-  <T extends INode> INodeDirectory addToParent(
-                                      byte[][] pathComponents,
-                                      T newNode,
-                                      INodeDirectory parent,
-                                      boolean inheritPermission,
-                                      boolean propagateModTime
-                                    ) throws FileNotFoundException, 
-                                             UnresolvedLinkException {
-              
+  INodeDirectory addToParent( byte[] localname,
+                              INode newNode,
+                              INodeDirectory parent,
+                              boolean inheritPermission,
+                              boolean propagateModTime
+                              ) throws FileNotFoundException, 
+                                       UnresolvedLinkException {
+    // insert into the parent children list
+    newNode.name = localname;
+    if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
+      return null;
+    return parent;
+  }
+
+  INodeDirectory getParent(byte[][] pathComponents)
+  throws FileNotFoundException, UnresolvedLinkException {
     int pathLen = pathComponents.length;
     if (pathLen < 2)  // add root
       return null;
-    if(parent == null) {
-      // Gets the parent INode
-      INode[] inodes  = new INode[2];
-      getExistingPathINodes(pathComponents, inodes, false);
-      INode inode = inodes[0];
-      if (inode == null) {
-        throw new FileNotFoundException("Parent path does not exist: "+
-            DFSUtil.byteArray2String(pathComponents));
-      }
-      if (!inode.isDirectory()) {
-        throw new FileNotFoundException("Parent path is not a directory: "+
-            DFSUtil.byteArray2String(pathComponents));
-      }
-      parent = (INodeDirectory)inode;
+    // Gets the parent INode
+    INode[] inodes  = new INode[2];
+    getExistingPathINodes(pathComponents, inodes, false);
+    INode inode = inodes[0];
+    if (inode == null) {
+      throw new FileNotFoundException("Parent path does not exist: "+
+          DFSUtil.byteArray2String(pathComponents));
     }
-    // insert into the parent children list
+    if (!inode.isDirectory()) {
+      throw new FileNotFoundException("Parent path is not a directory: "+
+          DFSUtil.byteArray2String(pathComponents));
+    }
+    return (INodeDirectory)inode;
+  }
+  
+  /**
+   * Add new inode 
+   * Optimized version of addNode()
+   * 
+   * @return  parent INode if new inode is inserted
+   *          or null if it already exists.
+   * @throws  FileNotFoundException if parent does not exist or 
+   *          is not a directory.
+   */
+  INodeDirectory addToParent( byte[][] pathComponents,
+                              INode newNode,
+                              boolean inheritPermission,
+                              boolean propagateModTime
+                            ) throws FileNotFoundException, 
+                                     UnresolvedLinkException {
+              
+    int pathLen = pathComponents.length;
+    if (pathLen < 2)  // add root
+      return null;
     newNode.name = pathComponents[pathLen-1];
+    // insert into the parent children list
+    INodeDirectory parent = getParent(pathComponents);
     if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
       return null;
     return parent;