You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/11/06 00:56:53 UTC

svn commit: r1406014 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/namenode/

Author: szetszwo
Date: Mon Nov  5 23:56:53 2012
New Revision: 1406014

URL: http://svn.apache.org/viewvc?rev=1406014&view=rev
Log:
svn merge -c 1406006 from trunk for HDFS-4151. Change the methods in FSDirectory to pass INodesInPath instead of INode[] as a parameter.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1406006

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1406014&r1=1406013&r2=1406014&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Nov  5 23:56:53 2012
@@ -146,11 +146,14 @@ Trunk (Unreleased)
     HDFS-4110. Refine a log printed in JNStorage. (Liang Xie via suresh)
 
     HDFS-4124. Refactor INodeDirectory#getExistingPathINodes() to enable 
-    returningmore than INode array. (Jing Zhao via suresh)
+    returning more than INode array. (Jing Zhao via suresh)
 
     HDFS-4129. Add utility methods to dump NameNode in memory tree for 
     testing. (szetszwo via suresh)
 
+    HDFS-4151. Change the methods in FSDirectory to pass INodesInPath instead
+    of INode[] as a parameter. (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1406006

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1406014&r1=1406013&r2=1406014&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Nov  5 23:56:53 2012
@@ -331,22 +331,18 @@ public class FSDirectory implements Clos
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  BlockInfo addBlock(String path,
-                     INode[] inodes,
-                     Block block,
-                     DatanodeDescriptor targets[]
-  ) throws QuotaExceededException {
+  BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
+      DatanodeDescriptor targets[]) throws IOException {
     waitForReady();
 
     writeLock();
     try {
-      assert inodes[inodes.length-1].isUnderConstruction() :
-        "INode should correspond to a file under construction";
-      INodeFileUnderConstruction fileINode = 
-        (INodeFileUnderConstruction)inodes[inodes.length-1];
+      final INode[] inodes = inodesInPath.getINodes();
+      final INodeFileUnderConstruction fileINode = 
+          INodeFileUnderConstruction.valueOf(inodes[inodes.length-1], path);
 
       // check quota limits and updated space consumed
-      updateCount(inodes, inodes.length-1, 0,
+      updateCount(inodesInPath, inodes.length-1, 0,
           fileINode.getPreferredBlockSize()*fileINode.getFileReplication(), true);
 
       // associate new last block for the file
@@ -442,8 +438,9 @@ public class FSDirectory implements Clos
     }
 
     // update space consumed
-    INode[] pathINodes = getExistingPathINodes(path);
-    updateCount(pathINodes, pathINodes.length-1, 0,
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, true);
+    final INode[] inodes = inodesInPath.getINodes();
+    updateCount(inodesInPath, inodes.length-1, 0,
         -fileNode.getPreferredBlockSize()*fileNode.getFileReplication(), true);
   }
 
@@ -511,7 +508,8 @@ public class FSDirectory implements Clos
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
     assert hasWriteLock();
-    INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    INode[] srcInodes = srcInodesInPath.getINodes();
     INode srcInode = srcInodes[srcInodes.length-1];
     
     // check the validation of the source
@@ -574,7 +572,7 @@ public class FSDirectory implements Clos
     String srcChildName = null;
     try {
       // remove src
-      srcChild = removeChild(srcInodes, srcInodes.length-1);
+      srcChild = removeChild(srcInodesInPath, srcInodes.length-1);
       if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
@@ -585,7 +583,7 @@ public class FSDirectory implements Clos
       srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
       // add src to the destination
-      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length-1,
           srcChild, UNKNOWN_DISK_SPACE);
       if (dstChild != null) {
         srcChild = null;
@@ -602,7 +600,7 @@ public class FSDirectory implements Clos
       if (dstChild == null && srcChild != null) {
         // put it back
         srcChild.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
+        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, srcChild, 
             UNKNOWN_DISK_SPACE);
       }
     }
@@ -635,7 +633,8 @@ public class FSDirectory implements Clos
       }
     }
     String error = null;
-    final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    final INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    final INode[] srcInodes = srcInodesInPath.getINodes();
     final INode srcInode = srcInodes[srcInodes.length - 1];
     // validate source
     if (srcInode == null) {
@@ -721,7 +720,7 @@ public class FSDirectory implements Clos
 
     // Ensure dst has quota to accommodate rename
     verifyQuotaForRename(srcInodes, dstInodes);
-    INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+    INode removedSrc = removeChild(srcInodesInPath, srcInodes.length - 1);
     if (removedSrc == null) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
@@ -734,14 +733,14 @@ public class FSDirectory implements Clos
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeChild(dstInodes, dstInodes.length - 1);
+        removedDst = removeChild(dstInodesInPath, dstInodes.length - 1);
         dstChildName = removedDst.getLocalName();
       }
 
       INode dstChild = null;
       removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
       // add src as dst to complete rename
-      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1,
           removedSrc, UNKNOWN_DISK_SPACE);
 
       int filesDeleted = 0;
@@ -769,13 +768,13 @@ public class FSDirectory implements Clos
       if (removedSrc != null) {
         // Rename failed - restore src
         removedSrc.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
+        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, removedSrc, 
             UNKNOWN_DISK_SPACE);
       }
       if (removedDst != null) {
         // Rename failed - restore dst
         removedDst.setLocalName(dstChildName);
-        addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
+        addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, removedDst, 
             UNKNOWN_DISK_SPACE);
       }
     }
@@ -815,7 +814,8 @@ public class FSDirectory implements Clos
                                     UnresolvedLinkException {
     assert hasWriteLock();
 
-    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     INode inode = inodes[inodes.length - 1];
     if (inode == null) {
       return null;
@@ -829,7 +829,7 @@ public class FSDirectory implements Clos
 
     // check disk quota
     long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
-    updateCount(inodes, inodes.length-1, 0, dsDelta, true);
+    updateCount(inodesInPath, inodes.length-1, 0, dsDelta, true);
 
     fileNode.setFileReplication(replication);
 
@@ -959,7 +959,8 @@ public class FSDirectory implements Clos
     }
     // do the move
     
-    INode [] trgINodes =  getExistingPathINodes(target);
+    final INodesInPath trgINodesInPath = rootDir.getExistingPathINodes(target, true);
+    final INode[] trgINodes = trgINodesInPath.getINodes();
     INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
     INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
     
@@ -986,7 +987,7 @@ public class FSDirectory implements Clos
     trgInode.setModificationTimeForce(timestamp);
     trgParent.setModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
-    unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+    unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
   }
 
   /**
@@ -1006,7 +1007,9 @@ public class FSDirectory implements Clos
     int filesRemoved;
     writeLock();
     try {
-      INode[] inodes = rootDir.getExistingPathINodes(normalizePath(src), false);
+      final INodesInPath inodesInPath = rootDir.getExistingPathINodes(
+          normalizePath(src), false);
+      final INode[] inodes = inodesInPath.getINodes();
       if (checkPathINodes(inodes, src) == 0) {
         filesRemoved = 0;
       } else {
@@ -1020,7 +1023,7 @@ public class FSDirectory implements Clos
               + " cannot be deleted since " + snapshotNode.getFullPathName()
               + " is snapshottable and already has snapshots");
         }
-        filesRemoved = unprotectedDelete(inodes, collectedBlocks, now);
+        filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks, now);
       }
     } finally {
       writeUnlock();
@@ -1084,11 +1087,13 @@ public class FSDirectory implements Clos
     List<Block> collectedBlocks = new ArrayList<Block>();
     int filesRemoved = 0;
 
-    INode[] inodes =  rootDir.getExistingPathINodes(normalizePath(src), false);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(
+        normalizePath(src), false);
+    final INode[] inodes = inodesInPath.getINodes();
     if (checkPathINodes(inodes, src) == 0) {
       filesRemoved = 0;
     } else {
-      filesRemoved = unprotectedDelete(inodes, collectedBlocks, mtime);
+      filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks, mtime);
     }
     if (filesRemoved > 0) {
       getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
@@ -1103,14 +1108,15 @@ public class FSDirectory implements Clos
    * @param mtime the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */ 
-  int unprotectedDelete(INode[] inodes, List<Block> collectedBlocks, 
+  int unprotectedDelete(INodesInPath inodesInPath, List<Block> collectedBlocks, 
       long mtime) {
     assert hasWriteLock();
-    
+
+    final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
     int pos = inodes.length - 1;
     // Remove the node from the namespace
-    targetNode = removeChild(inodes, pos);
+    targetNode = removeChild(inodesInPath, pos);
     if (targetNode == null) {
       return 0;
     }
@@ -1299,28 +1305,6 @@ public class FSDirectory implements Clos
       readUnlock();
     }
   }
-
-  /**
-   * Retrieve the existing INodes along the given path.
-   * 
-   * @param path the path to explore
-   * @return INodes array containing the existing INodes in the order they
-   *         appear when following the path from the root INode to the
-   *         deepest INodes. The array size will be the number of expected
-   *         components in the path, and non existing components will be
-   *         filled with null
-   *         
-   * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
-   */
-  INode[] getExistingPathINodes(String path) 
-    throws UnresolvedLinkException {
-    readLock();
-    try {
-      return rootDir.getExistingPathINodes(path, true);
-    } finally {
-      readUnlock();
-    }
-  }
   
   /**
    * Get the parent node of path.
@@ -1386,13 +1370,14 @@ public class FSDirectory implements Clos
                                                 UnresolvedLinkException {
     writeLock();
     try {
-      INode[] inodes = rootDir.getExistingPathINodes(path, false);
+      final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
+      final INode[] inodes = inodesInPath.getINodes();
       int len = inodes.length;
       if (inodes[len - 1] == null) {
         throw new FileNotFoundException(path + 
                                         " does not exist under rootDir.");
       }
-      updateCount(inodes, len-1, nsDelta, dsDelta, true);
+      updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
     } finally {
       writeUnlock();
     }
@@ -1407,7 +1392,7 @@ public class FSDirectory implements Clos
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private void updateCount(INode[] inodes, int numOfINodes, 
+  private void updateCount(INodesInPath inodesInPath, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
@@ -1415,29 +1400,25 @@ public class FSDirectory implements Clos
       //still initializing. do not check or update quotas.
       return;
     }
-    if (numOfINodes>inodes.length) {
+    final INode[] inodes = inodesInPath.getINodes();
+    if (numOfINodes > inodes.length) {
       numOfINodes = inodes.length;
     }
     if (checkQuota) {
       verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
     }
-    for(int i = 0; i < numOfINodes; i++) {
-      if (inodes[i].isQuotaSet()) { // a directory with quota
-        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-        node.updateNumItemsInTree(nsDelta, dsDelta);
-      }
-    }
+    unprotectedUpdateCount(inodesInPath, numOfINodes, nsDelta, dsDelta);
   }
   
   /** 
    * update quota of each inode and check to see if quota is exceeded. 
    * See {@link #updateCount(INode[], int, long, long, boolean)}
    */ 
-  private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
-                           long nsDelta, long dsDelta) {
+  private void updateCountNoQuotaCheck(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
     assert hasWriteLock();
     try {
-      updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
+      updateCount(inodesInPath, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
     }
@@ -1451,9 +1432,10 @@ public class FSDirectory implements Clos
    * @param nsDelta
    * @param dsDelta
    */
-   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
-                                      long nsDelta, long dsDelta) {
-     assert hasWriteLock();
+  private void unprotectedUpdateCount(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
+    assert hasWriteLock();
+    final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
         INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
@@ -1530,7 +1512,7 @@ public class FSDirectory implements Clos
       StringBuilder pathbuilder = new StringBuilder();
       int i = 1;
       for(; i < inodes.length && inodes[i] != null; i++) {
-        pathbuilder.append(Path.SEPARATOR + names[i]);
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
         if (!inodes[i].isDirectory()) {
           throw new FileAlreadyExistsException("Parent path is not a directory: "
               + pathbuilder+ " "+inodes[i].getLocalName());
@@ -1572,8 +1554,7 @@ public class FSDirectory implements Clos
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
-        String cur = pathbuilder.toString();
-        unprotectedMkdir(inodes, i, components[i],
+        unprotectedMkdir(inodesInPath, i, components[i],
             (i < lastInodeIndex) ? parentPermissions : permissions, now);
         if (inodes[i] == null) {
           return false;
@@ -1582,6 +1563,8 @@ public class FSDirectory implements Clos
         // to match count of FilesDeleted metric.
         if (getFSNamesystem() != null)
           NameNode.getNameNodeMetrics().incrFilesCreated();
+
+        final String cur = pathbuilder.toString();
         fsImage.getEditLog().logMkDir(cur, inodes[i]);
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -1602,30 +1585,30 @@ public class FSDirectory implements Clos
     INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
         components.length, false);
     INode[] inodes = inodesInPath.getINodes();
-    unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
-        permissions, timestamp);
-    return inodes[inodes.length-1];
+    final int pos = inodes.length - 1;
+    unprotectedMkdir(inodesInPath, pos, components[pos], permissions, timestamp);
+    return inodes[pos];
   }
 
   /** create a directory at index pos.
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private void unprotectedMkdir(INode[] inodes, int pos,
+  private void unprotectedMkdir(INodesInPath inodesInPath, int pos,
       byte[] name, PermissionStatus permission,
       long timestamp) throws QuotaExceededException {
     assert hasWriteLock();
-    inodes[pos] = addChild(inodes, pos, 
-        new INodeDirectory(name, permission, timestamp),
-        -1);
+    final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
+    final INode inode = addChild(inodesInPath, pos, dir, -1, true);
+    inodesInPath.setINode(pos, inode);
   }
   
   /** Add a node child to the namespace. The full path name of the node is src.
    * childDiskspace should be -1, if unknown. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addNode(String src, T child, 
-        long childDiskspace) 
-  throws QuotaExceededException, UnresolvedLinkException {
+   * @throw QuotaExceededException is thrown if it violates quota limit
+   */
+  private <T extends INode> T addNode(String src, T child, long childDiskspace
+      ) throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     byte[] path = components[components.length-1];
     child.setLocalName(path);
@@ -1634,8 +1617,8 @@ public class FSDirectory implements Clos
     try {
       INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
           components.length, false);
-      INode[] inodes = inodesInPath.getINodes();
-      return addChild(inodes, inodes.length-1, child, childDiskspace);
+      return addChild(inodesInPath, inodesInPath.getINodes().length-1, child,
+          childDiskspace, true);
     } finally {
       writeUnlock();
     }
@@ -1760,19 +1743,22 @@ public class FSDirectory implements Clos
   }
   
   /** Add a node child to the inodes at index pos. 
-   * Its ancestors are stored at [0, pos-1]. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+   * Its ancestors are stored at [0, pos-1].
+   * @return the added node. 
+   * @throw QuotaExceededException is thrown if it violates quota limit
+   */
+  private <T extends INode> T addChild(INodesInPath inodesInPath, int pos,
       T child, long childDiskspace,
       boolean checkQuota) throws QuotaExceededException {
-	// The filesystem limits are not really quotas, so this check may appear
-	// odd.  It's because a rename operation deletes the src, tries to add
-	// to the dest, if that fails, re-adds the src from whence it came.
-	// The rename code disables the quota when it's restoring to the
-	// original location becase a quota violation would cause the the item
-	// to go "poof".  The fs limits must be bypassed for the same reason.
+    final INode[] inodes = inodesInPath.getINodes();
+    // The filesystem limits are not really quotas, so this check may appear
+    // odd. It's because a rename operation deletes the src, tries to add
+    // to the dest, if that fails, re-adds the src from whence it came.
+    // The rename code disables the quota when it's restoring to the
+    // original location becase a quota violation would cause the the item
+    // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      verifyFsLimits(pathComponents, pos, child);
+      verifyFsLimits(inodes, pos, child);
     }
     
     INode.DirCounts counts = new INode.DirCounts();
@@ -1780,31 +1766,22 @@ public class FSDirectory implements Clos
     if (childDiskspace < 0) {
       childDiskspace = counts.getDsCount();
     }
-    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
-        checkQuota);
-    if (pathComponents[pos-1] == null) {
+    updateCount(inodesInPath, pos, counts.getNsCount(), childDiskspace, checkQuota);
+    if (inodes[pos-1] == null) {
       throw new NullPointerException("Panic: parent does not exist");
     }
-    T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
-        child, true);
+    final T addedNode = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, -counts.getNsCount(), 
-          -childDiskspace, true);
+      updateCount(inodesInPath, pos, -counts.getNsCount(), -childDiskspace, true);
     }
     return addedNode;
   }
-
-  private <T extends INode> T addChild(INode[] pathComponents, int pos,
-      T child, long childDiskspace)
-      throws QuotaExceededException {
-    return addChild(pathComponents, pos, child, childDiskspace, true);
-  }
   
-  private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
+  private <T extends INode> T addChildNoQuotaCheck(INodesInPath inodesInPath,
       int pos, T child, long childDiskspace) {
     T inode = null;
     try {
-      inode = addChild(pathComponents, pos, child, childDiskspace, false);
+      inode = addChild(inodesInPath, pos, child, childDiskspace, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
     }
@@ -1816,13 +1793,13 @@ public class FSDirectory implements Clos
    * Count of each ancestor with quota is also updated.
    * Return the removed node; null if the removal fails.
    */
-  private INode removeChild(INode[] pathComponents, int pos) {
-    INode removedNode = 
-      ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
+  private INode removeChild(final INodesInPath inodesInPath, int pos) {
+    final INode[] inodes = inodesInPath.getINodes();
+    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
-      updateCountNoQuotaCheck(pathComponents, pos,
+      updateCountNoQuotaCheck(inodesInPath, pos,
                   -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;
@@ -1957,7 +1934,8 @@ public class FSDirectory implements Clos
     
     String srcs = normalizePath(src);
 
-    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
     if (targetNode == null) {
       throw new FileNotFoundException("Directory does not exist: " + srcs);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1406014&r1=1406013&r2=1406014&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Nov  5 23:56:53 2012
@@ -159,6 +159,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
@@ -1674,7 +1675,7 @@ public class FSNamesystem implements Nam
     }
   }
 
-  /*
+  /**
    * Verify that parent directory of src exists.
    */
   private void verifyParentDir(String src) throws FileNotFoundException,
@@ -1682,14 +1683,13 @@ public class FSNamesystem implements Nam
     assert hasReadOrWriteLock();
     Path parent = new Path(src).getParent();
     if (parent != null) {
-      INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
-      INode parentNode = pathINodes[pathINodes.length - 1];
+      final INode parentNode = dir.getINode(parent.toString());
       if (parentNode == null) {
         throw new FileNotFoundException("Parent directory doesn't exist: "
-            + parent.toString());
+            + parent);
       } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
         throw new ParentNotDirectoryException("Parent path is not a directory: "
-            + parent.toString());
+            + parent);
       }
     }
   }
@@ -2209,18 +2209,18 @@ public class FSNamesystem implements Nam
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
-      INode[] pathINodes = dir.getExistingPathINodes(src);
-      int inodesLen = pathINodes.length;
-      checkLease(src, clientName, pathINodes[inodesLen-1]);
-      INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
-                                                pathINodes[inodesLen - 1];
+
+      final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
+      final INode[] inodes = inodesInPath.getINodes();
+      final INodeFileUnderConstruction pendingFile
+          = checkLease(src, clientName, inodes[inodes.length - 1]);
                                                            
       if (!checkFileProgress(pendingFile, false)) {
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pathINodes, targets);
+      newBlock = allocateBlock(src, inodesInPath, targets);
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
@@ -2431,14 +2431,12 @@ public class FSNamesystem implements Nam
    * Allocate a block at the given pending filename
    * 
    * @param src path to the file
-   * @param inodes INode representing each of the components of src. 
-   *        <code>inodes[inodes.length-1]</code> is the INode for the file.
-   *        
+   * @param inodesInPath representing each of the components of src. 
+   *                     The last INode is the INode for the file.
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
-  private Block allocateBlock(String src, INode[] inodes,
-      DatanodeDescriptor targets[]) throws QuotaExceededException,
-      SafeModeException {
+  private Block allocateBlock(String src, INodesInPath inodesInPath,
+      DatanodeDescriptor targets[]) throws IOException {
     assert hasWriteLock();
     Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); 
     while(isValidBlock(b)) {
@@ -2447,7 +2445,7 @@ public class FSNamesystem implements Nam
     // Increment the generation stamp for every new block.
     nextGenerationStamp();
     b.setGenerationStamp(getGenerationStamp());
-    b = dir.addBlock(src, inodes, b, targets);
+    b = dir.addBlock(src, inodesInPath, b, targets);
     NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
         + blockPoolId + " " + b);
     return b;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1406014&r1=1406013&r2=1406014&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Mon Nov  5 23:56:53 2012
@@ -121,7 +121,7 @@ class FSPermissionChecker {
     }
     // check if (parentAccess != null) && file exists, then check sb
       // Resolve symlinks, the check is performed on the link target.
-      INode[] inodes = root.getExistingPathINodes(path, true);
+      final INode[] inodes = root.getExistingPathINodes(path, true).getINodes();
       int ancestorIndex = inodes.length - 2;
       for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
           ancestorIndex--);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1406014&r1=1406013&r2=1406014&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Nov  5 23:56:53 2012
@@ -284,14 +284,12 @@ public class INodeDirectory extends INod
    *         components in the path, and non existing components will be
    *         filled with null
    *         
-   * @see #getExistingPathINodes(byte[][], INode[])
+   * @see #getExistingPathINodes(byte[][], int, boolean)
    */
-  INode[] getExistingPathINodes(String path, boolean resolveLink) 
+  INodesInPath getExistingPathINodes(String path, boolean resolveLink) 
     throws UnresolvedLinkException {
     byte[][] components = getPathComponents(path);
-    INodesInPath inodes = this.getExistingPathINodes(components,
-        components.length, resolveLink);
-    return inodes.inodes;
+    return getExistingPathINodes(components, components.length, resolveLink);
   }
 
   /**
@@ -563,6 +561,10 @@ public class INodeDirectory extends INod
       inodes[size++] = node;
     }
     
+    void setINode(int i, INode inode) {
+      inodes[i] = inode;
+    }
+    
     /**
      * @return The number of non-null elements
      */