You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:31:05 UTC

svn commit: r1077022 - in /hadoop/common/branches/branch-0.20-security-patches/src: hdfs/org/apache/hadoop/hdfs/server/namenode/ test/org/apache/hadoop/hdfs/

Author: omalley
Date: Fri Mar  4 03:31:05 2011
New Revision: 1077022

URL: http://svn.apache.org/viewvc?rev=1077022&view=rev
Log:
commit 24f5ea08fdac972e99bcecc4c51f71da02ab26ca
Author: Suresh Srinivas <su...@yahoo-inc.com>
Date:   Fri Oct 16 10:37:51 2009 -0700

    HADOOP:677 from https://issues.apache.org/jira/secure/attachment/12422371/hdfs-677.rel20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +71. HDFS-709. Fix TestDFSShell failure due to rename bug introduced by
    +    HDFS-677. (suresh)
    +
    +70. HDFS-677. Rename failure when both source and destination quota exceeds
    +    results in deletion of source. (suresh)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSRename.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1077022&r1=1077021&r2=1077022&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Mar  4 03:31:05 2011
@@ -262,8 +262,8 @@ class FSDirectory implements FSConstants
       INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
 
       // check quota limits and updated space consumed
-      updateCount(inodes, inodes.length-1, 0, 
-                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
+      updateCount(inodes, inodes.length-1, 0,
+          fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
       
       // associate the new list of blocks with this file
       namesystem.blocksMap.addINode(block, fileNode);
@@ -363,9 +363,11 @@ class FSDirectory implements FSConstants
       // check the validation of the source
       if (srcInodes[srcInodes.length-1] == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                                     +"failed to rename "+src+" to "+dst+ " because source does not exist");
+            + "failed to rename " + src + " to " + dst
+            + " because source does not exist");
         return false;
-      } else if (srcInodes.length == 1) {
+      } 
+      if (srcInodes.length == 1) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             +"failed to rename "+src+" to "+dst+ " because source is the root");
         return false;
@@ -374,71 +376,78 @@ class FSDirectory implements FSConstants
         dst += Path.SEPARATOR + new Path(src).getName();
       }
       
-      // remove source
-      INode srcChild = null;
-      try {
-        srcChild = removeChild(srcInodes, srcInodes.length-1);
-      } catch (IOException e) {
-        // srcChild == null; go to next if statement
+      // check the validity of the destination
+      if (dst.equals(src)) {
+        return true;
       }
-      if (srcChild == null) {
+      // dst cannot be directory or a file under src
+      if (dst.startsWith(src) && 
+          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst+ " because the source can not be removed");
+            + "failed to rename " + src + " to " + dst
+            + " because destination starts with src");
         return false;
       }
-
-      String srcChildName = srcChild.getLocalName();
       
-      // check the validity of the destination
-      INode dstChild = null;
-      QuotaExceededException failureByQuota = null;
-
       byte[][] dstComponents = INode.getPathComponents(dst);
       INode[] dstInodes = new INode[dstComponents.length];
       rootDir.getExistingPathINodes(dstComponents, dstInodes);
-      if (dstInodes[dstInodes.length-1] != null) { //check if destination exists
+      if (dstInodes[dstInodes.length-1] != null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                      +"failed to rename "+src+" to "+dst+ 
                                      " because destination exists");
-      } else if (dstInodes[dstInodes.length-2] == null) { // check if its parent exists
+        return false;
+      }
+      if (dstInodes[dstInodes.length-2] == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             +"failed to rename "+src+" to "+dst+ 
-            " because destination's parent does not exists");
+            " because destination's parent does not exist");
+        return false;
       }
-      else {
-        // add to the destination
-        srcChild.setLocalName(dstComponents[dstInodes.length-1]);
-        try {
-          // add it to the namespace
-          dstChild = addChild(dstInodes, dstInodes.length-1, srcChild, false);
-        } catch (QuotaExceededException qe) {
-          failureByQuota = qe;
+      
+      // Ensure dst has quota to accommodate rename
+      verifyQuotaForRename(srcInodes,dstInodes);
+      
+      INode dstChild = null;
+      INode srcChild = null;
+      String srcChildName = null;
+      try {
+        // remove src
+        srcChild = removeChild(srcInodes, srcInodes.length-1);
+        if (srcChild == null) {
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + "failed to rename " + src + " to " + dst
+              + " because the source can not be removed");
+          return false;
         }
-      }
-      if (dstChild != null) {
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
-            +src+" is renamed to "+dst);
+        srcChildName = srcChild.getLocalName();
+        srcChild.setLocalName(dstComponents[dstInodes.length-1]);
+        
+        // add src to the destination
+        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+            srcChild, -1, false);
+        if (dstChild != null) {
+          srcChild = null;
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " + src
+                    + " is renamed to " + dst);
+          }
+          // update modification time of dst and the parent of src
+          srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+          dstInodes[dstInodes.length-2].setModificationTime(timestamp);
+          return true;
         }
-
-        // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
-        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
-        return true;
-      } else {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst);
-        try {
+      } finally {
+        if (dstChild == null && srcChild != null) {
           // put it back
           srcChild.setLocalName(srcChildName);
-          addChild(srcInodes, srcInodes.length-1, srcChild, false);
-        } catch (IOException ignored) {}
-        if (failureByQuota != null) {
-          throw failureByQuota;
-        } else {
-          return false;
+          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, -1,
+              false);
         }
       }
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          +"failed to rename "+src+" to "+dst);
+      return false;
     }
   }
 
@@ -483,7 +492,7 @@ class FSDirectory implements FSConstants
       // check disk quota
       long dsDelta = (replication - oldReplication[0]) *
            (fileNode.diskspaceConsumed()/oldReplication[0]);
-      updateCount(inodes, inodes.length-1, 0, dsDelta);
+      updateCount(inodes, inodes.length-1, 0, dsDelta, true);
 
       fileNode.setReplication(replication);
       fileBlocks = fileNode.getBlocks();
@@ -828,7 +837,7 @@ class FSDirectory implements FSConstants
         throw new FileNotFoundException(path + 
                                         " does not exist under rootDir.");
       }
-      updateCount(inodes, len-1, nsDelta, dsDelta);
+      updateCount(inodes, len-1, nsDelta, dsDelta, true);
     }
   }
   
@@ -838,10 +847,11 @@ class FSDirectory implements FSConstants
    * @param numOfINodes the number of inodes to update starting from index 0
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
+   * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
   private void updateCount(INode[] inodes, int numOfINodes, 
-                           long nsDelta, long dsDelta)
+                           long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     if (!ready) {
       //still intializing. do not check or update quotas.
@@ -850,28 +860,27 @@ class FSDirectory implements FSConstants
     if (numOfINodes>inodes.length) {
       numOfINodes = inodes.length;
     }
-    // check existing components in the path  
-    int i=0;
-    try {
-      for(; i < numOfINodes; i++) {
-        if (inodes[i].isQuotaSet()) { // a directory with quota
-          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-          node.updateNumItemsInTree(nsDelta, dsDelta);
-        }
+    if (checkQuota) {
+      verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
+    }
+    for(int i = 0; i < numOfINodes; i++) {
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+        node.updateNumItemsInTree(nsDelta, dsDelta);
       }
+    }
+  }
+  
+  /** 
+   * update quota of each inode and check to see if quota is exceeded. 
+   * See {@link #updateCount(INode[], int, long, long, boolean)}
+   */ 
+  private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
+                           long nsDelta, long dsDelta) {
+    try {
+      updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
-      e.setPathName(getFullPathName(inodes, i));
-      // undo updates
-      for( ; i-- > 0; ) {
-        try {
-          if (inodes[i].isQuotaSet()) { // a directory with quota
-            INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-            node.updateNumItemsInTree(-nsDelta, -dsDelta);
-          }
-        } catch (IOException ingored) {
-        }
-      }
-      throw e;
+      NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
     }
   }
   
@@ -965,7 +974,7 @@ class FSDirectory implements FSConstants
       long timestamp) throws QuotaExceededException {
     inodes[pos] = addChild(inodes, pos, 
         new INodeDirectory(name, permission, timestamp),
-        inheritPermission );
+        -1, inheritPermission );
   }
   
   /** Add a node child to the namespace. The full path name of the node is src.
@@ -983,48 +992,119 @@ class FSDirectory implements FSConstants
                       inheritPermission);
     }
   }
+
+  /**
+   * Verify quota for adding or moving a new INode with required 
+   * namespace and diskspace to a given position.
+   *  
+   * @param inodes INodes corresponding to a path
+   * @param pos position where a new INode will be added
+   * @param nsDelta needed namespace
+   * @param dsDelta needed diskspace
+   * @param commonAncestor Last node in inodes array that is a common ancestor
+   *          for a INode that is being moved from one location to the other.
+   *          Pass null if a node is not being moved.
+   * @throws QuotaExceededException if quota limit is exceeded.
+   */
+  private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
+      INode commonAncestor) throws QuotaExceededException {
+    if (pos>inodes.length) {
+      pos = inodes.length;
+    }
+    int i = pos - 1;
+    try {
+      // check existing components in the path  
+      for(; i >= 0; i--) {
+        if (commonAncestor == inodes[i]) {
+          // Moving an existing node. Stop checking for quota when common
+          // ancestor is reached
+          return;
+        }
+        if (inodes[i].isQuotaSet()) { // a directory with quota
+          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+          node.verifyQuota(nsDelta, dsDelta);
+        }
+      }
+    } catch (QuotaExceededException e) {
+      e.setPathName(getFullPathName(inodes, i));
+      throw e;
+    }
+  }
   
-  /** Add a node child to the inodes at index pos. 
-   * Its ancestors are stored at [0, pos-1]. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
-      boolean inheritPermission) throws QuotaExceededException {
-    return addChild(pathComponents, pos, child, -1, inheritPermission);
+  /**
+   * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
+   * dstInodes[dstInodes.length-1]
+   * 
+   * @param srcInodes directory from where node is being moved.
+   * @param dstInodes directory to where node is moved to.
+   * @throws QuotaExceededException if quota limit is exceeded.
+   */
+  private void verifyQuotaForRename(INode[] srcInodes, INode[]dstInodes)
+      throws QuotaExceededException {
+    INode srcInode = srcInodes[srcInodes.length - 1];
+    INode commonAncestor = null;
+    for(int i =0;srcInodes[i] == dstInodes[i]; i++) {
+      commonAncestor = srcInodes[i];
+    }
+    INode.DirCounts counts = new INode.DirCounts();
+    srcInode.spaceConsumedInTree(counts);
+    verifyQuota(dstInodes, dstInodes.length - 1, counts.getNsCount(),
+            counts.getDsCount(), commonAncestor);
   }
   
   /** Add a node child to the inodes at index pos. 
    * Its ancestors are stored at [0, pos-1]. 
    * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
-       long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
+  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+      T child, long childDiskspace, boolean inheritPermission,
+      boolean checkQuota) throws QuotaExceededException {
     INode.DirCounts counts = new INode.DirCounts();
     child.spaceConsumedInTree(counts);
     if (childDiskspace < 0) {
       childDiskspace = counts.getDsCount();
     }
-    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
+    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
+        checkQuota);
     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
         child, inheritPermission);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, 
-                  -counts.getNsCount(), -childDiskspace);
+      updateCount(pathComponents, pos, -counts.getNsCount(), 
+          -childDiskspace, true);
     }
     return addedNode;
   }
+
+  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+      T child, long childDiskspace, boolean inheritPermission)
+      throws QuotaExceededException {
+    return addChild(pathComponents, pos, child, childDiskspace,
+        inheritPermission, true);
+  }
+  
+  private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
+      int pos, T child, long childDiskspace, boolean inheritPermission) {
+    T inode = null;
+    try {
+      inode = addChild(pathComponents, pos, child, childDiskspace,
+          inheritPermission, false);
+    } catch (QuotaExceededException e) {
+      NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
+    }
+    return inode;
+  }
   
   /** Remove an inode at index pos from the namespace.
    * Its ancestors are stored at [0, pos-1].
    * Count of each ancestor with quota is also updated.
    * Return the removed node; null if the removal fails.
    */
-  private INode removeChild(INode[] pathComponents, int pos)
-  throws QuotaExceededException {
+  private INode removeChild(INode[] pathComponents, int pos) {
     INode removedNode = 
       ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
-      updateCount(pathComponents, pos,
+      updateCountNoQuotaCheck(pathComponents, pos,
                   -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1077022&r1=1077021&r2=1077022&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Fri Mar  4 03:31:05 2011
@@ -116,18 +116,10 @@ class INodeDirectoryWithQuota extends IN
    * 
    * @param nsDelta the change of the tree size
    * @param dsDelta change to disk space occupied
-   * @throws QuotaExceededException if the changed size is greater 
-   *                                than the quota
    */
-  void updateNumItemsInTree(long nsDelta, long dsDelta) throws 
-                            QuotaExceededException {
-    long newCount = nsCount + nsDelta;
-    long newDiskspace = diskspace + dsDelta;
-    if (nsDelta>0 || dsDelta>0) {
-      verifyQuota(nsQuota, newCount, dsQuota, newDiskspace);
-    }
-    nsCount = newCount;
-    diskspace = newDiskspace;
+  void updateNumItemsInTree(long nsDelta, long dsDelta) {
+    nsCount += nsDelta;
+    diskspace += dsDelta;
   }
   
   /** 
@@ -146,14 +138,16 @@ class INodeDirectoryWithQuota extends IN
   /** Verify if the namespace count disk space satisfies the quota restriction 
    * @throws QuotaExceededException if the given quota is less than the count
    */
-  private static void verifyQuota(long nsQuota, long nsCount, 
-                                  long dsQuota, long diskspace)
-                                  throws QuotaExceededException {
-    if (nsQuota >= 0 && nsQuota < nsCount) {
-      throw new NSQuotaExceededException(nsQuota, nsCount);
-    }
-    if (dsQuota >= 0 && dsQuota < diskspace) {
-      throw new DSQuotaExceededException(dsQuota, diskspace);
+  void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
+    long newCount = nsCount + nsDelta;
+    long newDiskspace = diskspace + dsDelta;
+    if (nsDelta>0 || dsDelta>0) {
+      if (nsQuota >= 0 && nsQuota < newCount) {
+        throw new NSQuotaExceededException(nsQuota, newCount);
+      }
+      if (dsQuota >= 0 && dsQuota < newDiskspace) {
+        throw new DSQuotaExceededException(dsQuota, newDiskspace);
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSRename.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSRename.java?rev=1077022&r1=1077021&r2=1077022&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSRename.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSRename.java Fri Mar  4 03:31:05 2011
@@ -23,14 +23,28 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 
 public class TestDFSRename extends junit.framework.TestCase {
+  MiniDFSCluster cluster = null;
   static int countLease(MiniDFSCluster cluster) {
     return cluster.getNameNode().namesystem.leaseManager.countLease();
   }
   
   final Path dir = new Path("/test/rename/");
 
+  @Override
+  protected void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    cluster = new MiniDFSCluster(conf, 2, true, null);
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    if (cluster != null) {cluster.shutdown();}
+  }
+  
   void list(FileSystem fs, String name) throws IOException {
     FileSystem.LOG.info("\n\n" + name);
     for(FileStatus s : fs.listStatus(dir)) {
@@ -38,61 +52,119 @@ public class TestDFSRename extends junit
     }
   }
 
+  static void createFile(FileSystem fs, Path f) throws IOException {
+    DataOutputStream a_out = fs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
+  
   public void testRename() throws Exception {
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      assertTrue(fs.mkdirs(dir));
+    FileSystem fs = cluster.getFileSystem();
+    assertTrue(fs.mkdirs(dir));
+    
+    { //test lease
+      Path a = new Path(dir, "a");
+      Path aa = new Path(dir, "aa");
+      Path b = new Path(dir, "b");
+
+      createFile(fs, a);
       
-      { //test lease
-        Path a = new Path(dir, "a");
-        Path aa = new Path(dir, "aa");
-        Path b = new Path(dir, "b");
-  
-        DataOutputStream a_out = fs.create(a);
-        a_out.writeBytes("something");
-        a_out.close();
-        
-        //should not have any lease
-        assertEquals(0, countLease(cluster)); 
-  
-        DataOutputStream aa_out = fs.create(aa);
-        aa_out.writeBytes("something");
-  
-        //should have 1 lease
-        assertEquals(1, countLease(cluster)); 
-        list(fs, "rename0");
-        fs.rename(a, b);
-        list(fs, "rename1");
-        aa_out.writeBytes(" more");
-        aa_out.close();
-        list(fs, "rename2");
-  
-        //should not have any lease
-        assertEquals(0, countLease(cluster));
-      }
-
-      { // test non-existent destination
-        Path dstPath = new Path("/c/d");
-        assertFalse(fs.exists(dstPath));
-        assertFalse(fs.rename(dir, dstPath));
-      }
-
-      { // test rename /a/b to /a/b/c
-        Path src = new Path("/a/b");
-        Path dst = new Path("/a/b/c");
-
-        DataOutputStream a_out = fs.create(new Path(src, "foo"));
-        a_out.writeBytes("something");
-        a_out.close();
-        
-        assertFalse(fs.rename(src, dst));
-      }
+      //should not have any lease
+      assertEquals(0, countLease(cluster)); 
+
+      createFile(fs, aa);
+      DataOutputStream aa_out = fs.create(aa);
+      aa_out.writeBytes("something");
+
+      //should have 1 lease
+      assertEquals(1, countLease(cluster)); 
+      list(fs, "rename0");
+      fs.rename(a, b);
+      list(fs, "rename1");
+      aa_out.writeBytes(" more");
+      aa_out.close();
+      list(fs, "rename2");
+
+      //should not have any lease
+      assertEquals(0, countLease(cluster));
+    }
+
+    { // test non-existent destination
+      Path dstPath = new Path("/c/d");
+      assertFalse(fs.exists(dstPath));
+      assertFalse(fs.rename(dir, dstPath));
+    }
+
+    { // dst cannot be a file or directory under src
+      // test rename /a/b/foo to /a/b/c
+      Path src = new Path("/a/b");
+      Path dst = new Path("/a/b/c");
+
+      createFile(fs, new Path(src, "foo"));
+      
+      // dst cannot be a file under src
+      assertFalse(fs.rename(src, dst)); 
       
-      fs.delete(dir, true);
-    } finally {
-      if (cluster != null) {cluster.shutdown();}
+      // dst cannot be a directory under src
+      assertFalse(fs.rename(src.getParent(), dst.getParent())); 
+    }
+    
+    { // dst can start with src, if it is not a directory or file under src
+      // test rename /test /testfile
+      Path src = new Path("/testPrefix");
+      Path dst = new Path("/testPrefixfile");
+
+      createFile(fs, src);
+      assertTrue(fs.rename(src, dst));
+    }
+    
+    { // dst should not be same as src test rename /a/b/c to /a/b/c
+      Path src = new Path("/a/b/c");
+      createFile(fs, src);
+      assertTrue(fs.rename(src, src));
+      assertFalse(fs.rename(new Path("/a/b"), new Path("/a/b/")));
+      assertTrue(fs.rename(src, new Path("/a/b/c/")));
+    }
+    
+    fs.delete(dir, true);
+  }
+  
+  public void testRenameWithQuota() throws Exception {
+    DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+    Path src1 = new Path(dir, "testRenameWithQuota/srcdir/src1");
+    Path src2 = new Path(dir, "testRenameWithQuota/srcdir/src2");
+    Path dst1 = new Path(dir, "testRenameWithQuota/dstdir/dst1");
+    Path dst2 = new Path(dir, "testRenameWithQuota/dstdir/dst2");
+    createFile(fs, src1);
+    createFile(fs, src2);
+    fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
+        FSConstants.QUOTA_DONT_SET);
+    fs.mkdirs(dst1.getParent());
+    fs.setQuota(dst1.getParent(), FSConstants.QUOTA_DONT_SET,
+        FSConstants.QUOTA_DONT_SET);
+
+    // Test1: src does not exceed quota and dst has quota to accommodate rename
+    rename(src1, dst1, true, false);
+
+    // Test2: src does not exceed quota and dst has *no* quota to accommodate
+    // rename
+    fs.setQuota(dst1.getParent(), 1, FSConstants.QUOTA_DONT_SET);
+    rename(src2, dst2, false, true);
+
+    // Test3: src exceeds quota and dst has *no* quota to accommodate rename
+    fs.setQuota(src1.getParent(), 1, FSConstants.QUOTA_DONT_SET);
+    rename(dst1, src1, false, true);
+  }
+  
+  private void rename(Path src, Path dst, boolean renameSucceeds,
+      boolean quotaException) throws Exception {
+    DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+    try {
+      assertEquals(renameSucceeds, fs.rename(src, dst));
+    } catch (QuotaExceededException ex) {
+      assertTrue(quotaException);
     }
+    assertEquals(renameSucceeds, !fs.exists(src));
+    assertEquals(renameSucceeds, fs.exists(dst));
   }
 }