You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2013/04/24 20:01:46 UTC

svn commit: r1471567 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/

Author: suresh
Date: Wed Apr 24 18:01:46 2013
New Revision: 1471567

URL: http://svn.apache.org/r1471567
Log:
HDFS-4151. Merge r1406006 from trunk

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1471567&r1=1471566&r2=1471567&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Apr 24 18:01:46 2013
@@ -56,6 +56,9 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4124. Refactor INodeDirectory#getExistingPathINodes() to enable 
     returning more than INode array. (Jing Zhao via suresh)
 
+    HDFS-4151. Change the methods in FSDirectory to pass INodesInPath instead
+    of INode[] as a parameter. (szetszwo)
+
     HDFS-4129. Add utility methods to dump NameNode in memory tree for 
     testing. (szetszwo via suresh)
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1471567&r1=1471566&r2=1471567&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Apr 24 18:01:46 2013
@@ -310,22 +310,18 @@ public class FSDirectory implements Clos
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  BlockInfo addBlock(String path,
-                     INode[] inodes,
-                     Block block,
-                     DatanodeDescriptor targets[]
-  ) throws QuotaExceededException {
+  BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
+      DatanodeDescriptor targets[]) throws IOException {
     waitForReady();
 
     writeLock();
     try {
-      assert inodes[inodes.length-1].isUnderConstruction() :
-        "INode should correspond to a file under construction";
-      INodeFileUnderConstruction fileINode = 
-        (INodeFileUnderConstruction)inodes[inodes.length-1];
+      final INode[] inodes = inodesInPath.getINodes();
+      final INodeFileUnderConstruction fileINode = 
+          INodeFileUnderConstruction.valueOf(inodes[inodes.length-1], path);
 
       // check quota limits and updated space consumed
-      updateCount(inodes, inodes.length-1, 0,
+      updateCount(inodesInPath, inodes.length-1, 0,
           fileINode.getPreferredBlockSize()*fileINode.getBlockReplication(), true);
 
       // associate new last block for the file
@@ -418,8 +414,9 @@ public class FSDirectory implements Clos
     }
 
     // update space consumed
-    INode[] pathINodes = getExistingPathINodes(path);
-    updateCount(pathINodes, pathINodes.length-1, 0,
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, true);
+    final INode[] inodes = inodesInPath.getINodes();
+    updateCount(inodesInPath, inodes.length-1, 0,
         -fileNode.getPreferredBlockSize()*fileNode.getBlockReplication(), true);
   }
 
@@ -487,7 +484,8 @@ public class FSDirectory implements Clos
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
     assert hasWriteLock();
-    INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    INode[] srcInodes = srcInodesInPath.getINodes();
     INode srcInode = srcInodes[srcInodes.length-1];
     
     // check the validation of the source
@@ -550,7 +548,7 @@ public class FSDirectory implements Clos
     String srcChildName = null;
     try {
       // remove src
-      srcChild = removeChild(srcInodes, srcInodes.length-1);
+      srcChild = removeChild(srcInodesInPath, srcInodes.length-1);
       if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
@@ -561,7 +559,7 @@ public class FSDirectory implements Clos
       srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
       // add src to the destination
-      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length-1,
           srcChild, UNKNOWN_DISK_SPACE);
       if (dstChild != null) {
         srcChild = null;
@@ -580,7 +578,7 @@ public class FSDirectory implements Clos
       if (dstChild == null && srcChild != null) {
         // put it back
         srcChild.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
+        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, srcChild, 
             UNKNOWN_DISK_SPACE);
       }
     }
@@ -613,7 +611,8 @@ public class FSDirectory implements Clos
       }
     }
     String error = null;
-    final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    final INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    final INode[] srcInodes = srcInodesInPath.getINodes();
     final INode srcInode = srcInodes[srcInodes.length - 1];
     // validate source
     if (srcInode == null) {
@@ -700,7 +699,7 @@ public class FSDirectory implements Clos
 
     // Ensure dst has quota to accommodate rename
     verifyQuotaForRename(srcInodes, dstInodes);
-    INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+    INode removedSrc = removeChild(srcInodesInPath, srcInodes.length - 1);
     if (removedSrc == null) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
@@ -713,14 +712,14 @@ public class FSDirectory implements Clos
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeChild(dstInodes, dstInodes.length - 1);
+        removedDst = removeChild(dstInodesInPath, dstInodes.length - 1);
         dstChildName = removedDst.getLocalName();
       }
 
       INode dstChild = null;
       removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
       // add src as dst to complete rename
-      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1,
           removedSrc, UNKNOWN_DISK_SPACE);
 
       int filesDeleted = 0;
@@ -750,13 +749,13 @@ public class FSDirectory implements Clos
       if (removedSrc != null) {
         // Rename failed - restore src
         removedSrc.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
+        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, removedSrc, 
             UNKNOWN_DISK_SPACE);
       }
       if (removedDst != null) {
         // Rename failed - restore dst
         removedDst.setLocalName(dstChildName);
-        addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
+        addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, removedDst, 
             UNKNOWN_DISK_SPACE);
       }
     }
@@ -796,7 +795,8 @@ public class FSDirectory implements Clos
                                     UnresolvedLinkException {
     assert hasWriteLock();
 
-    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     INode inode = inodes[inodes.length - 1];
     if (inode == null || !inode.isFile()) {
       return null;
@@ -806,7 +806,7 @@ public class FSDirectory implements Clos
 
     // check disk quota
     long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
-    updateCount(inodes, inodes.length-1, 0, dsDelta, true);
+    updateCount(inodesInPath, inodes.length-1, 0, dsDelta, true);
 
     fileNode.setReplication(replication);
 
@@ -927,7 +927,8 @@ public class FSDirectory implements Clos
     }
     // do the move
     
-    INode [] trgINodes =  getExistingPathINodes(target);
+    final INodesInPath trgINodesInPath = rootDir.getExistingPathINodes(target, true);
+    final INode[] trgINodes = trgINodesInPath.getINodes();
     INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
     INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
     
@@ -954,7 +955,7 @@ public class FSDirectory implements Clos
     trgInode.setModificationTimeForce(timestamp);
     trgParent.setModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
-    unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+    unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
   }
 
   /**
@@ -1037,7 +1038,8 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     src = normalizePath(src);
 
-    INode[] inodes =  rootDir.getExistingPathINodes(src, false);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, false);
+    final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
 
     if (targetNode == null) { // non-existent src
@@ -1055,7 +1057,7 @@ public class FSDirectory implements Clos
     }
     int pos = inodes.length - 1;
     // Remove the node from the namespace
-    targetNode = removeChild(inodes, pos);
+    targetNode = removeChild(inodesInPath, pos);
     if (targetNode == null) {
       return 0;
     }
@@ -1190,28 +1192,6 @@ public class FSDirectory implements Clos
       readUnlock();
     }
   }
-
-  /**
-   * Retrieve the existing INodes along the given path.
-   * 
-   * @param path the path to explore
-   * @return INodes array containing the existing INodes in the order they
-   *         appear when following the path from the root INode to the
-   *         deepest INodes. The array size will be the number of expected
-   *         components in the path, and non existing components will be
-   *         filled with null
-   *         
-   * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
-   */
-  INode[] getExistingPathINodes(String path) 
-    throws UnresolvedLinkException {
-    readLock();
-    try {
-      return rootDir.getExistingPathINodes(path, true);
-    } finally {
-      readUnlock();
-    }
-  }
   
   /**
    * Get the parent node of path.
@@ -1277,13 +1257,14 @@ public class FSDirectory implements Clos
                                                 UnresolvedLinkException {
     writeLock();
     try {
-      INode[] inodes = rootDir.getExistingPathINodes(path, false);
+      final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
+      final INode[] inodes = inodesInPath.getINodes();
       int len = inodes.length;
       if (inodes[len - 1] == null) {
         throw new FileNotFoundException(path + 
                                         " does not exist under rootDir.");
       }
-      updateCount(inodes, len-1, nsDelta, dsDelta, true);
+      updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
     } finally {
       writeUnlock();
     }
@@ -1298,7 +1279,7 @@ public class FSDirectory implements Clos
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private void updateCount(INode[] inodes, int numOfINodes, 
+  private void updateCount(INodesInPath inodesInPath, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
@@ -1306,29 +1287,25 @@ public class FSDirectory implements Clos
       //still initializing. do not check or update quotas.
       return;
     }
-    if (numOfINodes>inodes.length) {
+    final INode[] inodes = inodesInPath.getINodes();
+    if (numOfINodes > inodes.length) {
       numOfINodes = inodes.length;
     }
     if (checkQuota) {
       verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
     }
-    for(int i = 0; i < numOfINodes; i++) {
-      if (inodes[i].isQuotaSet()) { // a directory with quota
-        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-        node.addSpaceConsumed(nsDelta, dsDelta);
-      }
-    }
+    unprotectedUpdateCount(inodesInPath, numOfINodes, nsDelta, dsDelta);
   }
   
   /** 
    * update quota of each inode and check to see if quota is exceeded. 
    * See {@link #updateCount(INode[], int, long, long, boolean)}
    */ 
-  private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
-                           long nsDelta, long dsDelta) {
+  private void updateCountNoQuotaCheck(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
     assert hasWriteLock();
     try {
-      updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
+      updateCount(inodesInPath, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
     }
@@ -1342,9 +1319,10 @@ public class FSDirectory implements Clos
    * @param nsDelta
    * @param dsDelta
    */
-   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
-                                      long nsDelta, long dsDelta) {
-     assert hasWriteLock();
+  private void unprotectedUpdateCount(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
+    assert hasWriteLock();
+    final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
         INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
@@ -1426,7 +1404,7 @@ public class FSDirectory implements Clos
       StringBuilder pathbuilder = new StringBuilder();
       int i = 1;
       for(; i < inodes.length && inodes[i] != null; i++) {
-        pathbuilder.append(Path.SEPARATOR + names[i]);
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
         if (!inodes[i].isDirectory()) {
           throw new FileAlreadyExistsException("Parent path is not a directory: "
               + pathbuilder+ " "+inodes[i].getLocalName());
@@ -1468,8 +1446,7 @@ public class FSDirectory implements Clos
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
-        String cur = pathbuilder.toString();
-        unprotectedMkdir(inodes, i, components[i],
+        unprotectedMkdir(inodesInPath, i, components[i],
             (i < lastInodeIndex) ? parentPermissions : permissions, now);
         if (inodes[i] == null) {
           return false;
@@ -1478,6 +1455,8 @@ public class FSDirectory implements Clos
         // to match count of FilesDeleted metric.
         if (getFSNamesystem() != null)
           NameNode.getNameNodeMetrics().incrFilesCreated();
+
+        final String cur = pathbuilder.toString();
         fsImage.getEditLog().logMkDir(cur, inodes[i]);
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -1498,30 +1477,30 @@ public class FSDirectory implements Clos
     INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
         components.length, false);
     INode[] inodes = inodesInPath.getINodes();
-    unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
-        permissions, timestamp);
-    return inodes[inodes.length-1];
+    final int pos = inodes.length - 1;
+    unprotectedMkdir(inodesInPath, pos, components[pos], permissions, timestamp);
+    return inodes[pos];
   }
 
   /** create a directory at index pos.
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private void unprotectedMkdir(INode[] inodes, int pos,
+  private void unprotectedMkdir(INodesInPath inodesInPath, int pos,
       byte[] name, PermissionStatus permission,
       long timestamp) throws QuotaExceededException {
     assert hasWriteLock();
-    inodes[pos] = addChild(inodes, pos, 
-        new INodeDirectory(name, permission, timestamp),
-        -1);
+    final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
+    final INode inode = addChild(inodesInPath, pos, dir, -1, true);
+    inodesInPath.setINode(pos, inode);
   }
   
   /** Add a node child to the namespace. The full path name of the node is src.
    * childDiskspace should be -1, if unknown. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addNode(String src, T child, 
-        long childDiskspace) 
-  throws QuotaExceededException, UnresolvedLinkException {
+   * @throw QuotaExceededException is thrown if it violates quota limit
+   */
+  private <T extends INode> T addNode(String src, T child, long childDiskspace
+      ) throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     byte[] path = components[components.length-1];
     child.setLocalName(path);
@@ -1530,8 +1509,8 @@ public class FSDirectory implements Clos
     try {
       INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
           components.length, false);
-      INode[] inodes = inodesInPath.getINodes();
-      return addChild(inodes, inodes.length-1, child, childDiskspace);
+      return addChild(inodesInPath, inodesInPath.getINodes().length-1, child,
+          childDiskspace, true);
     } finally {
       writeUnlock();
     }
@@ -1656,19 +1635,22 @@ public class FSDirectory implements Clos
   }
   
   /** Add a node child to the inodes at index pos. 
-   * Its ancestors are stored at [0, pos-1]. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+   * Its ancestors are stored at [0, pos-1].
+   * @return the added node. 
+   * @throw QuotaExceededException is thrown if it violates quota limit
+   */
+  private <T extends INode> T addChild(INodesInPath inodesInPath, int pos,
       T child, long childDiskspace,
       boolean checkQuota) throws QuotaExceededException {
-	// The filesystem limits are not really quotas, so this check may appear
-	// odd.  It's because a rename operation deletes the src, tries to add
-	// to the dest, if that fails, re-adds the src from whence it came.
-	// The rename code disables the quota when it's restoring to the
-	// original location becase a quota violation would cause the the item
-	// to go "poof".  The fs limits must be bypassed for the same reason.
+    final INode[] inodes = inodesInPath.getINodes();
+    // The filesystem limits are not really quotas, so this check may appear
+    // odd. It's because a rename operation deletes the src, tries to add
+    // to the dest, if that fails, re-adds the src from whence it came.
+    // The rename code disables the quota when it's restoring to the
+    // original location becase a quota violation would cause the the item
+    // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      verifyFsLimits(pathComponents, pos, child);
+      verifyFsLimits(inodes, pos, child);
     }
     
     INode.DirCounts counts = new INode.DirCounts();
@@ -1676,31 +1658,22 @@ public class FSDirectory implements Clos
     if (childDiskspace < 0) {
       childDiskspace = counts.getDsCount();
     }
-    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
-        checkQuota);
-    if (pathComponents[pos-1] == null) {
+    updateCount(inodesInPath, pos, counts.getNsCount(), childDiskspace, checkQuota);
+    if (inodes[pos-1] == null) {
       throw new NullPointerException("Panic: parent does not exist");
     }
-    T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
-        child, true);
+    final T addedNode = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, -counts.getNsCount(), 
-          -childDiskspace, true);
+      updateCount(inodesInPath, pos, -counts.getNsCount(), -childDiskspace, true);
     }
     return addedNode;
   }
-
-  private <T extends INode> T addChild(INode[] pathComponents, int pos,
-      T child, long childDiskspace)
-      throws QuotaExceededException {
-    return addChild(pathComponents, pos, child, childDiskspace, true);
-  }
   
-  private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
+  private <T extends INode> T addChildNoQuotaCheck(INodesInPath inodesInPath,
       int pos, T child, long childDiskspace) {
     T inode = null;
     try {
-      inode = addChild(pathComponents, pos, child, childDiskspace, false);
+      inode = addChild(inodesInPath, pos, child, childDiskspace, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
     }
@@ -1712,13 +1685,13 @@ public class FSDirectory implements Clos
    * Count of each ancestor with quota is also updated.
    * Return the removed node; null if the removal fails.
    */
-  private INode removeChild(INode[] pathComponents, int pos) {
-    INode removedNode = 
-      ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
+  private INode removeChild(final INodesInPath inodesInPath, int pos) {
+    final INode[] inodes = inodesInPath.getINodes();
+    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
-      updateCountNoQuotaCheck(pathComponents, pos,
+      updateCountNoQuotaCheck(inodesInPath, pos,
                   -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;
@@ -1853,7 +1826,8 @@ public class FSDirectory implements Clos
     
     String srcs = normalizePath(src);
 
-    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
     if (targetNode == null) {
       throw new FileNotFoundException("Directory does not exist: " + srcs);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1471567&r1=1471566&r2=1471567&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Apr 24 18:01:46 2013
@@ -167,6 +167,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
@@ -1688,7 +1689,7 @@ public class FSNamesystem implements Nam
     }
   }
 
-  /*
+  /**
    * Verify that parent directory of src exists.
    */
   private void verifyParentDir(String src) throws FileNotFoundException,
@@ -1696,14 +1697,13 @@ public class FSNamesystem implements Nam
     assert hasReadOrWriteLock();
     Path parent = new Path(src).getParent();
     if (parent != null) {
-      INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
-      INode parentNode = pathINodes[pathINodes.length - 1];
+      final INode parentNode = dir.getINode(parent.toString());
       if (parentNode == null) {
         throw new FileNotFoundException("Parent directory doesn't exist: "
-            + parent.toString());
+            + parent);
       } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
         throw new ParentNotDirectoryException("Parent path is not a directory: "
-            + parent.toString());
+            + parent);
       }
     }
   }
@@ -2149,7 +2149,7 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.READ);
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
       final INode[] inodes = analyzeFileState(
-          src, clientName, previous, onRetryBlock);
+          src, clientName, previous, onRetryBlock).getINodes();
       final INodeFileUnderConstruction pendingFile =
           (INodeFileUnderConstruction) inodes[inodes.length - 1];
 
@@ -2180,8 +2180,9 @@ public class FSNamesystem implements Nam
       // Run the full analysis again, since things could have changed
       // while chooseTarget() was executing.
       LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-      INode[] inodes =
+      INodesInPath inodesInPath =
           analyzeFileState(src, clientName, previous, onRetryBlock);
+      final INode[] inodes = inodesInPath.getINodes();
       final INodeFileUnderConstruction pendingFile =
           (INodeFileUnderConstruction) inodes[inodes.length - 1];
 
@@ -2196,7 +2197,7 @@ public class FSNamesystem implements Nam
 
       // allocate new block, record block locations in INode.
       newBlock = createNewBlock();
-      saveAllocatedBlock(src, inodes, newBlock, targets);
+      saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
       dir.persistBlocks(src, pendingFile);
       offset = pendingFile.computeFileSize(true);
@@ -2211,7 +2212,7 @@ public class FSNamesystem implements Nam
     return makeLocatedBlock(newBlock, targets, offset);
   }
 
-  INode[] analyzeFileState(String src,
+  INodesInPath analyzeFileState(String src,
                                 String clientName,
                                 ExtendedBlock previous,
                                 LocatedBlock[] onRetryBlock)
@@ -2229,7 +2230,8 @@ public class FSNamesystem implements Nam
     checkFsObjectLimit();
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    final INode[] inodes = dir.rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     final INodeFileUnderConstruction pendingFile
         = checkLease(src, clientName, inodes[inodes.length - 1]);
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
@@ -2289,7 +2291,7 @@ public class FSNamesystem implements Nam
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
             offset);
-        return inodes;
+        return inodesInPath;
       } else {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -2302,7 +2304,7 @@ public class FSNamesystem implements Nam
     if (!checkFileProgress(pendingFile, false)) {
       throw new NotReplicatedYetException("Not replicated yet: " + src);
     }
-    return inodes;
+    return inodesInPath;
   }
 
   LocatedBlock makeLocatedBlock(Block blk,
@@ -2512,7 +2514,7 @@ public class FSNamesystem implements Nam
    *                     The last INode is the INode for the file.
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
-  BlockInfo saveAllocatedBlock(String src, INode[] inodes,
+  BlockInfo saveAllocatedBlock(String src, INodesInPath inodes,
       Block newBlock, DatanodeDescriptor targets[]) throws IOException {
     assert hasWriteLock();
     BlockInfo b = dir.addBlock(src, inodes, newBlock, targets);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1471567&r1=1471566&r2=1471567&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Wed Apr 24 18:01:46 2013
@@ -132,7 +132,7 @@ class FSPermissionChecker {
     }
     // check if (parentAccess != null) && file exists, then check sb
       // Resolve symlinks, the check is performed on the link target.
-      INode[] inodes = root.getExistingPathINodes(path, true);
+      final INode[] inodes = root.getExistingPathINodes(path, true).getINodes();
       int ancestorIndex = inodes.length - 2;
       for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
           ancestorIndex--);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1471567&r1=1471566&r2=1471567&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Apr 24 18:01:46 2013
@@ -245,14 +245,12 @@ class INodeDirectory extends INode {
    *         components in the path, and non existing components will be
    *         filled with null
    *         
-   * @see #getExistingPathINodes(byte[][], INode[])
+   * @see #getExistingPathINodes(byte[][], int, boolean)
    */
-  INode[] getExistingPathINodes(String path, boolean resolveLink) 
+  INodesInPath getExistingPathINodes(String path, boolean resolveLink) 
     throws UnresolvedLinkException {
     byte[][] components = getPathComponents(path);
-    INodesInPath inodes = this.getExistingPathINodes(components,
-        components.length, resolveLink);
-    return inodes.inodes;
+    return getExistingPathINodes(components, components.length, resolveLink);
   }
 
   /**
@@ -419,6 +417,28 @@ class INodeDirectory extends INode {
     children = null;
     return total;
   }
+  
+  /**
+   * Used by
+   * {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}.
+   * Containing INodes information resolved from a given path.
+   */
+  static class INodesInPath {
+    private INode[] inodes;
+    
+    public INodesInPath(int number) {
+      assert (number >= 0);
+      this.inodes = new INode[number];
+    }
+    
+    INode[] getINodes() {
+      return inodes;
+    }
+    
+    void setINode(int i, INode inode) {
+      inodes[i] = inode;
+    }
+  }
 
   /*
    * The following code is to dump the tree recursively for testing.
@@ -467,22 +487,4 @@ class INodeDirectory extends INode {
     }
     prefix.setLength(prefix.length() - 2);
   }
-  
-  /**
-   * Used by
-   * {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}.
-   * Containing INodes information resolved from a given path.
-   */
-  static class INodesInPath {
-    private INode[] inodes;
-    
-    public INodesInPath(int number) {
-      assert (number >= 0);
-      this.inodes = new INode[number];
-    }
-    
-    INode[] getINodes() {
-      return inodes;
-    }
-  }
 }