You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2009/10/23 23:21:16 UTC

svn commit: r829243 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ src/test/aop/org/apach...

Author: suresh
Date: Fri Oct 23 21:21:14 2009
New Revision: 829243

URL: http://svn.apache.org/viewvc?rev=829243&view=rev
Log:
HDFS-654. Add support new atomic rename functionality in HDFS for supporting rename in FileContext. Contributed by Suresh Srinivas.

Added:
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestRename.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Oct 23 21:21:14 2009
@@ -10,6 +10,9 @@
     implements offline erasure-coding.  It can be used to reduce the 
     total storage requirements of HDFS.  (dhruba)
 
+    HDFS-654. Add support new atomic rename functionality in HDFS for 
+    supporting rename in FileContext. (suresh)
+
   IMPROVEMENTS
     
     HDFS-704. Unify build property names to facilitate cross-projects

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Oct 23 21:21:14 2009
@@ -71,6 +71,7 @@
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -658,8 +659,10 @@
 
   /**
    * Rename file or directory.
-   * See {@link ClientProtocol#rename(String, String)}. 
+   * See {@link ClientProtocol#rename(String, String)}.
+   * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
    */
+  @Deprecated
   public boolean rename(String src, String dst) throws IOException {
     checkOpen();
     try {
@@ -672,6 +675,21 @@
   }
 
   /**
+   * Rename file or directory.
+   * See {@link ClientProtocol#rename(String, String, Options.Rename...)}
+   */
+  public void rename(String src, String dst, Options.Rename... options) throws IOException {
+    checkOpen();
+    try {
+      namenode.rename(src, dst, options);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class);
+    }
+  }
+
+  /**
    * Delete file or directory.
    * See {@link ClientProtocol#delete(String)}. 
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Oct 23 21:21:14 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.Options;
 
 
 /****************************************************************
@@ -245,18 +246,23 @@
     return dfs.setReplication(getPathName(src), replication);
   }
 
-  /**
-   * Rename files/dirs
-   */
+  /** {@inheritDoc} */
+  @SuppressWarnings("deprecation")
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
     return dfs.rename(getPathName(src), getPathName(dst));
   }
 
-  /**
-   * requires a boolean check to delete a non 
-   * empty directory recursively.
+  /** 
+   * {@inheritDoc}
+   * This rename operation is guaranteed to be atomic.
    */
+  @SuppressWarnings("deprecation")
+  @Override
+  public void rename(Path src, Path dst, Options.Rename... options) throws IOException {
+    dfs.rename(getPathName(src), getPathName(dst), options);
+  }
+  
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
    return dfs.delete(getPathName(f), recursive);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Oct 23 21:21:14 2009
@@ -21,9 +21,11 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -44,9 +46,10 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 50: change LocatedBlocks to include last block information.
+   * 51: New rename method with support of destination overwrite for the use of
+   * {@link FileContext}
    */
-  public static final long versionID = 50L;
+  public static final long versionID = 51L;
   
   ///////////////////////////////////////
   // File contents
@@ -227,7 +230,6 @@
   ///////////////////////////////////////
   /**
    * Rename an item in the file system namespace.
-   * 
    * @param src existing file or directory name.
    * @param dst new name.
    * @return true if successful, or false if the old name does not exist
@@ -235,10 +237,34 @@
    * @throws IOException if the new name is invalid.
    * @throws QuotaExceededException if the rename would violate 
    *                                any quota restriction
+   * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
    */
+  @Deprecated
   public boolean rename(String src, String dst) throws IOException;
 
   /**
+   * Rename src to dst.
+   * <ul>
+   * <li>Fails if src is a file and dst is a directory.
+   * <li>Fails if src is a directory and dst is a file.
+   * <li>Fails if the parent of dst does not exist or is a file.
+   * </ul>
+   * <p>
+   * Without OVERWRITE option, rename fails if the dst already exists.
+   * With OVERWRITE option, rename overwrites the dst, if it is a file 
+   * or an empty directory. Rename fails if dst is a non-empty directory.
+   * <p>
+   * This implementation of rename is atomic.
+   * <p>
+   * @param src existing file or directory name.
+   * @param dst new name.
+   * @param options Rename options
+   * @throws IOException if rename failed
+   */
+  public void rename(String src, String dst, Options.Rename... options)
+      throws IOException;
+  
+  /**
    * Delete the given file or directory from the file system.
    * <p>
    * Any blocks belonging to the deleted files will be garbage-collected.

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Oct 23 21:21:14 2009
@@ -91,9 +91,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -20;
+  public static final int LAYOUT_VERSION = -21;
   // Current version: 
-  // -20: DataNode adds a "rbw" sub directory to data directory
-  //      current dir contains "finalized" subdir for finalized replicas
-  //      and "rbw" subdir for replicas being written to.
+  // -21: Added new rename operation to edit log
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Oct 23 21:21:14 2009
@@ -29,9 +29,13 @@
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -367,7 +371,9 @@
 
   /**
    * @see #unprotectedRenameTo(String, String, long)
+   * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    */
+  @Deprecated
   boolean renameTo(String src, String dst) throws QuotaExceededException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
@@ -381,15 +387,33 @@
     return true;
   }
 
-  /** Change a path name
+  /**
+   * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
+   */
+  void renameTo(String src, String dst, Options.Rename... options)
+      throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
+          + " to " + dst);
+    }
+    waitForReady();
+    long now = FSNamesystem.now();
+    unprotectedRenameTo(src, dst, now, options);
+    fsImage.getEditLog().logRename(src, dst, now, options);
+  }
+
+  /**
+   * Change a path name
    * 
    * @param src source path
    * @param dst destination path
    * @return true if rename succeeds; false otherwise
    * @throws QuotaExceededException if the operation violates any quota limit
+   * @deprecated See {@link #renameTo(String, String)}
    */
-  boolean unprotectedRenameTo(String src, String dst, long timestamp) 
-  throws QuotaExceededException {
+  @Deprecated
+  boolean unprotectedRenameTo(String src, String dst, long timestamp)
+      throws QuotaExceededException {
     synchronized (rootDir) {
       INode[] srcInodes = rootDir.getExistingPathINodes(src);
 
@@ -462,8 +486,8 @@
         if (dstChild != null) {
           srcChild = null;
           if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " + src
-                    + " is renamed to " + dst);
+            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
+                + src + " is renamed to " + dst);
           }
           // update modification time of dst and the parent of src
           srcInodes[srcInodes.length-2].setModificationTime(timestamp);
@@ -485,6 +509,171 @@
   }
 
   /**
+   * Rename src to dst.
+   * See {@link DistributedFileSystem#rename(Path, Path, Options.Rename...)}
+   * for details related to rename semantics.
+   * 
+   * @param src source path
+   * @param dst destination path
+   * @param timestamp modification time
+   * @param options Rename options
+   * @throws IOException if the operation violates any quota limit
+   */
+  void unprotectedRenameTo(String src, String dst, long timestamp,
+      Options.Rename... options) throws IOException {
+    boolean overwrite = false;
+    if (null != options) {
+      for (Rename option : options) {
+        if (option == Rename.OVERWRITE) {
+          overwrite = true;
+        }
+      }
+    }
+    String error = null;
+    synchronized (rootDir) {
+      final INode[] srcInodes = rootDir.getExistingPathINodes(src);
+      final INode srcInode = srcInodes[srcInodes.length - 1];
+      // validate source
+      if (srcInode == null) {
+        error = "rename source " + src + " is not found.";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new FileNotFoundException(error);
+      }
+      if (srcInodes.length == 1) {
+        error = "rename source cannot be the root";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+
+      // validate of the destination
+      if (dst.equals(src)) {
+        return;
+      }
+      // dst cannot be a directory or a file under src
+      if (dst.startsWith(src) && 
+          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+        error = "Rename destination " + dst
+            + " is a directory or file under source " + src;
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+      final byte[][] dstComponents = INode.getPathComponents(dst);
+      final INode[] dstInodes = new INode[dstComponents.length];
+      rootDir.getExistingPathINodes(dstComponents, dstInodes);
+      INode dstInode = dstInodes[dstInodes.length - 1];
+      if (dstInodes.length == 1) {
+        error = "rename destination cannot be the root";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+      if (dstInode != null) { // Destination exists
+        if (dstInode.isDirectory() != srcInode.isDirectory()) {
+          error = "Source " + src + " Destination " + dst
+              + " both should be either file or directory";
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + error);
+          throw new IOException(error);
+        }
+        if (!overwrite) { // If destination exists, overwrite flag must be true
+          error = "rename destination " + dst + " already exists";
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + error);
+          throw new FileAlreadyExistsException(error);
+        }
+        List<INode> children = dstInode.isDirectory() ? 
+            ((INodeDirectory) dstInode).getChildrenRaw() : null;
+        if (children != null && children.size() != 0) {
+          error = "rename cannot overwrite non empty destination directory "
+              + dst;
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + error);
+          throw new IOException(error);
+        }
+      }
+      if (dstInodes[dstInodes.length - 2] == null) {
+        error = "rename destination parent " + dst + " not found.";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new FileNotFoundException(error);
+      }
+      if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+        error = "rename destination parent " + dst + " is a file.";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new ParentNotDirectoryException(error);
+      }
+
+      // Ensure dst has quota to accommodate rename
+      verifyQuotaForRename(srcInodes, dstInodes);
+      INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+      if (removedSrc == null) {
+        error = "Failed to rename " + src + " to " + dst
+            + " because the source can not be removed";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+      final String srcChildName = removedSrc.getLocalName();
+      String dstChildName = null;
+      INode removedDst = null;
+      try {
+        if (dstInode != null) { // dst exists remove it
+          removedDst = removeChild(dstInodes, dstInodes.length - 1);
+          dstChildName = removedDst.getLocalName();
+        }
+
+        INode dstChild = null;
+        removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
+        // add src as dst to complete rename
+        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+            removedSrc, -1, false);
+
+        if (dstChild != null) {
+          removedSrc = null;
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog
+                .debug("DIR* FSDirectory.unprotectedRenameTo: " + src
+                    + " is renamed to " + dst);
+          }
+          srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+          dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+          // Collect the blocks and remove the lease for previous dst
+          if (removedDst != null) {
+            INode rmdst = removedDst;
+            removedDst = null;
+            List<Block> collectedBlocks = new ArrayList<Block>();
+            int filecount = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
+            incrDeletedFileCount(filecount);
+            getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+          }
+          return;
+        }
+      } finally {
+        if (removedSrc != null) {
+          // Rename failed - restore src
+          removedSrc.setLocalName(srcChildName);
+          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, -1,
+              false);
+        }
+        if (removedDst != null) {
+          // Rename failed - restore dst
+          removedDst.setLocalName(dstChildName);
+          addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, -1,
+              false);
+        }
+      }
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + "failed to rename " + src + " to " + dst);
+      throw new IOException("rename from " + src + " to " + dst + " failed.");
+    }
+  }
+
+  /**
    * Set file replication
    * 
    * @param src file name
@@ -1084,6 +1273,10 @@
    */
   private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
       INode commonAncestor) throws QuotaExceededException {
+    if (nsDelta <= 0 && dsDelta <= 0) {
+      // if quota is being freed or not being consumed
+      return;
+    }
     if (pos>inodes.length) {
       pos = inodes.length;
     }
@@ -1122,10 +1315,21 @@
     for(int i =0;srcInodes[i] == dstInodes[i]; i++) {
       commonAncestor = srcInodes[i];
     }
-    INode.DirCounts counts = new INode.DirCounts();
-    srcInode.spaceConsumedInTree(counts);
-    verifyQuota(dstInodes, dstInodes.length - 1, counts.getNsCount(),
-            counts.getDsCount(), commonAncestor);
+    INode.DirCounts srcCounts = new INode.DirCounts();
+    srcInode.spaceConsumedInTree(srcCounts);
+    long nsDelta = srcCounts.getNsCount();
+    long dsDelta = srcCounts.getDsCount();
+    
+    // Reduce the required quota by dst that is being removed
+    INode dstInode = dstInodes[dstInodes.length - 1];
+    if (dstInode != null) {
+      INode.DirCounts dstCounts = new INode.DirCounts();
+      dstInode.spaceConsumedInTree(dstCounts);
+      nsDelta -= dstCounts.getNsCount();
+      dsDelta -= dstCounts.getDsCount();
+    }
+    verifyQuota(dstInodes, dstInodes.length - 1, nsDelta, dsDelta,
+        commonAncestor);
   }
   
   /** Add a node child to the inodes at index pos. 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 23 21:21:14 2009
@@ -27,6 +27,8 @@
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -45,6 +47,7 @@
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
@@ -57,7 +60,7 @@
 public class FSEditLog {
   public  static final byte OP_INVALID = -1;
   private static final byte OP_ADD = 0;
-  private static final byte OP_RENAME = 1;  // rename
+  private static final byte OP_RENAME_OLD = 1;  // rename
   private static final byte OP_DELETE = 2;  // delete
   private static final byte OP_MKDIR = 3;   // create directory
   private static final byte OP_SET_REPLICATION = 4; // set replication
@@ -74,6 +77,7 @@
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+  private static final byte OP_RENAME = 15;  // new rename
   /* 
    * The following operations are used to control remote edit log streams,
    * and not logged into file streams.
@@ -408,6 +412,7 @@
     return numEdits;
   }
 
+  @SuppressWarnings("deprecation")
   int loadEditRecords(int logVersion, DataInputStream in,
                              boolean closeOnExit) throws IOException {
     FSNamesystem fsNamesys = fsimage.getFSNamesystem();
@@ -417,9 +422,9 @@
     String clientMachine = null;
     String path = null;
     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
-        numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
+        numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpOther = 0;
+        numOpTimes = 0, numOpRename = 0, numOpOther = 0;
     try {
       while (true) {
         long timestamp = 0;
@@ -540,8 +545,8 @@
           fsDir.unprotectedSetReplication(path, replication, null);
           break;
         } 
-        case OP_RENAME: {
-          numOpRename++;
+        case OP_RENAME_OLD: {
+          numOpRenameOld++;
           int length = in.readInt();
           if (length != 3) {
             throw new IOException("Incorrect data format. " 
@@ -672,6 +677,26 @@
           fsDir.unprotectedSetTimes(path, mtime, atime, true);
           break;
         }
+        case OP_RENAME: {
+          if (logVersion > -21) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpRename++;
+          int length = in.readInt();
+          if (length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String s = FSImage.readString(in);
+          String d = FSImage.readString(in);
+          timestamp = readLong(in);
+          Rename[] options = readRenameOptions(in);
+          FileStatus dinfo = fsDir.getFileInfo(d);
+          fsDir.unprotectedRenameTo(s, d, timestamp, options);
+          fsNamesys.changeLease(s, d, dinfo);
+          break;
+        }
         default: {
           throw new IOException("Never seen opcode " + opcode);
         }
@@ -683,12 +708,14 @@
     }
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
-          + " numOpDelete = " + numOpDelete + " numOpRename = " + numOpRename 
+          + " numOpDelete = " + numOpDelete 
+          + " numOpRenameOld = " + numOpRenameOld 
           + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
           + " numOpSetPerm = " + numOpSetPerm 
           + " numOpSetOwner = " + numOpSetOwner
           + " numOpSetGenStamp = " + numOpSetGenStamp 
           + " numOpTimes = " + numOpTimes
+          + " numOpRename = " + numOpRename
           + " numOpOther = " + numOpOther);
     }
     return numEdits;
@@ -933,7 +960,19 @@
       new DeprecatedUTF8(src),
       new DeprecatedUTF8(dst),
       FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info));
+    logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+  }
+  
+  /** 
+   * Add rename record to edit log
+   */
+  void logRename(String src, String dst, long timestamp, Options.Rename... options) {
+    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+      new DeprecatedUTF8(src),
+      new DeprecatedUTF8(dst),
+      FSEditLog.toLogLong(timestamp)};
+    logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
+        toBytesWritable(options));
   }
   
   /** 
@@ -1443,4 +1482,25 @@
     processIOError(errorStreams, true);
     return regAllowed;
   }
+  
+  static Rename[] readRenameOptions(DataInputStream in) throws IOException {
+    BytesWritable writable = new BytesWritable();
+    writable.readFields(in);
+    
+    byte[] bytes = writable.getBytes();
+    Rename[] options = new Rename[bytes.length];
+    
+    for (int i = 0; i < bytes.length; i++) {
+      options[i] = Rename.valueOf(bytes[i]);
+    }
+    return options;
+  }
+  
+  static BytesWritable toBytesWritable(Options.Rename... options) {
+    byte[] bytes = new byte[options.length];
+    for (int i = 0; i < options.length; i++) {
+      bytes[i] = options[i].value();
+    }
+    return new BytesWritable(bytes);
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 23 21:21:14 2009
@@ -61,6 +61,8 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.io.IOUtils;
@@ -1443,8 +1445,12 @@
   // are made, edit namespace and return to client.
   ////////////////////////////////////////////////////////////////
 
-  /** Change the indicated filename. */
-  public boolean renameTo(String src, String dst) throws IOException {
+  /** 
+   * Change the indicated filename. 
+   * @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
+   */
+  @Deprecated
+  boolean renameTo(String src, String dst) throws IOException {
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
@@ -1456,6 +1462,8 @@
     return status;
   }
 
+  /** @deprecated See {@link #renameTo(String, String)} */
+  @Deprecated
   private synchronized boolean renameToInternal(String src, String dst
       ) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
@@ -1481,7 +1489,46 @@
     }
     return false;
   }
+  
+
+  /** Rename src to dst */
+  void renameTo(String src, String dst, Options.Rename... options)
+      throws IOException {
+    renameToInternal(src, dst, options);
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled()) {
+      StringBuilder cmd = new StringBuilder("rename options=");
+      for (Rename option : options) {
+        cmd.append(option.value()).append(" ");
+      }
+      final FileStatus stat = dir.getFileInfo(dst);
+      logAuditEvent(UserGroupInformation.getCurrentUGI(), Server.getRemoteIp(),
+                    cmd.toString(), src, dst, stat);
+    }
+  }
+
+  private synchronized void renameToInternal(String src, String dst,
+      Options.Rename... options) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+          + src + " to " + dst);
+    }
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot rename " + src, safeMode);
+    }
+    if (!DFSUtil.isValidName(dst)) {
+      throw new IOException("Invalid name: " + dst);
+    }
+    if (isPermissionEnabled) {
+      checkParentAccess(src, FsAction.WRITE);
+      checkAncestorAccess(dst, FsAction.WRITE);
+    }
 
+    FileStatus dinfo = dir.getFileInfo(dst);
+    dir.renameTo(src, dst, options);
+    changeLease(src, dst, dinfo); // update lease with new filename
+  }
+  
   /**
    * Remove the indicated filename from namespace. If the filename 
    * is a directory (non empty) and recursive is set to false then throw exception.

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Oct 23 21:21:14 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
@@ -697,8 +698,9 @@
     return namesystem.getPreferredBlockSize(filename);
   }
     
-  /**
-   */
+  /** {@inheritDoc} */
+  @Deprecated
+  @Override
   public boolean rename(String src, String dst) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
     if (!checkPathLength(dst)) {
@@ -711,6 +713,19 @@
     }
     return ret;
   }
+  
+
+  /** {@inheritDoc} */
+  @Override
+  public void rename(String src, String dst, Options.Rename... options) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    namesystem.renameTo(src, dst, options);
+    myMetrics.numFilesRenamed.inc();
+  }
 
   /**
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Oct 23 21:21:14 2009
@@ -96,7 +96,7 @@
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = {-16, -17, -18, -19, -20};
+  private static int [] versions = {-16, -17, -18, -19, -20, -21};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestRename.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestRename.java?rev=829243&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestRename.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestRename.java Fri Oct 23 21:21:14 2009
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Rename names src to dst. Rename is done using following steps:
+ * <ul>
+ * <li>Checks are made to ensure src exists and appropriate flags are being
+ * passed to overwrite existing destination.
+ * <li>src is removed.
+ * <li>dst if it exists is removed.
+ * <li>src is renamed and added to directory tree as dst.
+ * </ul>
+ * 
+ * During any of the above steps, the state of src and dst is reverted back to
+ * what it was prior to rename. This test ensures that the state is reverted
+ * back.
+ * 
+ * This test uses AspectJ to simulate failures.
+ */
+public class TestRename {
+  private static final Log LOG = LogFactory.getLog(TestRename.class);
+  private static String removeChild = "";
+  private static String addChild = "";
+  private static byte[] data = { 0 };
+  
+  private static String TEST_ROOT_DIR = 
+    System.getProperty("test.build.data", "/tmp") + "/test";
+  
+  private static Configuration CONF = new Configuration();
+  static {
+    CONF.setLong("dfs.block.size", 1);
+    CONF.setInt("io.bytes.per.checksum", 1);
+  }
+
+  private MiniDFSCluster cluster = null;
+  private FileContext fc = null;
+
+  @Before
+  public void setup() throws IOException {
+    restartCluster(true);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (fc != null) {
+      fc.delete(getTestRootPath(), true);
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void restartCluster(boolean format) throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    cluster = new MiniDFSCluster(CONF, 1, format, null);
+    cluster.waitClusterUp();
+    fc = FileContext.getFileContext(cluster.getFileSystem());
+  }
+
+  /**
+   * Returns true to indicate an exception should be thrown to simulate failure
+   * during removal of a node from directory tree.
+   */
+  public static boolean throwExceptionOnRemove(String child) {
+    boolean status = removeChild.endsWith(child);
+    if (status) {
+      removeChild = "";
+    }
+    return status;
+  }
+
+  /**
+   * Returns true to indicate an exception should be thrown to simulate failure
+   * during addition of a node to directory tree.
+   */
+  public static boolean throwExceptionOnAdd(String child) {
+    boolean status = addChild.endsWith(child);
+    if (status) {
+      addChild = "";
+    }
+    return status;
+  }
+
+  /** Set child name on removal of which failure should be simulated */
+  public static void exceptionOnRemove(String child) {
+    removeChild = child;
+    addChild = "";
+  }
+
+  /** Set child name on addition of which failure should be simulated */
+  public static void exceptionOnAdd(String child) {
+    removeChild = "";
+    addChild = child;
+  }
+
+  private Path getTestRootPath() {
+    return fc.makeQualified(new Path(TEST_ROOT_DIR));
+  }
+
+  private Path getTestPath(String pathString) {
+    return fc.makeQualified(new Path(TEST_ROOT_DIR, pathString));
+  }
+
+  private void createFile(Path path) throws IOException {
+    FSDataOutputStream out = fc.create(path, EnumSet.of(CreateFlag.CREATE),
+        Options.CreateOpts.createParent());
+    out.write(data, 0, data.length);
+    out.close();
+  }
+
+  /** Rename test when src exists and dst does not */
+  @Test
+  public void testFailureNonExistentDst() throws Exception {
+    final Path src = getTestPath("testFailureNonExistenSrc/dir/src");
+    final Path dst = getTestPath("testFailureNonExistenSrc/newdir/dst");
+    createFile(src);
+
+    // During rename, while removing src, an exception is thrown
+    TestRename.exceptionOnRemove(src.toString());
+    rename(src, dst, true, true, false, Rename.NONE);
+
+    // During rename, while adding dst an exception is thrown
+    TestRename.exceptionOnAdd(dst.toString());
+    rename(src, dst, true, true, false, Rename.NONE);
+  }
+
+  /** Rename test when src and dst exist */
+  @Test
+  public void testFailuresExistingDst() throws Exception {
+    final Path src = getTestPath("testFailuresExistingDst/dir/src");
+    final Path dst = getTestPath("testFailuresExistingDst/newdir/dst");
+    createFile(src);
+    createFile(dst);
+
+    // During rename, while removing src, an exception is thrown
+    TestRename.exceptionOnRemove(src.toString());
+    rename(src, dst, true, true, true, Rename.OVERWRITE);
+
+    // During rename, while removing dst, an exception is thrown
+    TestRename.exceptionOnRemove(dst.toString());
+    rename(src, dst, true, true, true, Rename.OVERWRITE);
+
+    // During rename, while adding dst an exception is thrown
+    TestRename.exceptionOnAdd(dst.toString());
+    rename(src, dst, true, true, true, Rename.OVERWRITE);
+  }
+
+  /** Rename test where both src and dst are files */
+  @Test
+  public void testDeletionOfDstFile() throws Exception {
+    Path src = getTestPath("testDeletionOfDstFile/dir/src");
+    Path dst = getTestPath("testDeletionOfDstFile/newdir/dst");
+    createFile(src);
+    createFile(dst);
+
+    final FSNamesystem namesystem = cluster.getNamesystem();
+    final long blocks = namesystem.getBlocksTotal();
+    final long fileCount = namesystem.getFilesTotal();
+    rename(src, dst, false, false, true, Rename.OVERWRITE);
+
+    // After successful rename the blocks corresponing dst are deleted
+    Assert.assertEquals(blocks - 1, namesystem.getBlocksTotal());
+
+    // After successful rename dst file is deleted
+    Assert.assertEquals(fileCount - 1, namesystem.getFilesTotal());
+
+    // Restart the cluster to ensure new rename operation 
+    // recorded in editlog is processed right
+    restartCluster(false);
+    int count = 0;
+    boolean exception = true;
+    src = getTestPath("testDeletionOfDstFile/dir/src");
+    dst = getTestPath("testDeletionOfDstFile/newdir/dst");
+    while (exception && count < 5) {
+      try {
+        fc.exists(src);
+        exception = false;
+      } catch (Exception e) {
+        LOG.warn("Exception " + " count " + count + " " + e.getMessage());
+        Thread.sleep(1000);
+        count++;
+      }
+    }
+    Assert.assertFalse(fc.exists(src));
+    Assert.assertTrue(fc.exists(dst));
+  }
+
+  /** Rename test where both src and dst are directories */
+  @Test
+  public void testDeletionOfDstDirectory() throws Exception {
+    Path src = getTestPath("testDeletionOfDstDirectory/dir/src");
+    Path dst = getTestPath("testDeletionOfDstDirectory/newdir/dst");
+    fc.mkdir(src, FileContext.DEFAULT_PERM, true);
+    fc.mkdir(dst, FileContext.DEFAULT_PERM, true);
+
+    FSNamesystem namesystem = cluster.getNamesystem();
+    long fileCount = namesystem.getFilesTotal();
+    rename(src, dst, false, false, true, Rename.OVERWRITE);
+
+    // After successful rename dst directory is deleted
+    Assert.assertEquals(fileCount - 1, namesystem.getFilesTotal());
+    
+    // Restart the cluster to ensure new rename operation 
+    // recorded in editlog is processed right
+    restartCluster(false);
+    src = getTestPath("testDeletionOfDstDirectory/dir/src");
+    dst = getTestPath("testDeletionOfDstDirectory/newdir/dst");
+    int count = 0;
+    boolean exception = true;
+    while (exception && count < 5) {
+      try {
+        fc.exists(src);
+        exception = false;
+      } catch (Exception e) {
+        LOG.warn("Exception " + " count " + count + " " + e.getMessage());
+        Thread.sleep(1000);
+        count++;
+      }
+    }
+    Assert.assertFalse(fc.exists(src));
+    Assert.assertTrue(fc.exists(dst));
+  }
+
+  private void rename(Path src, Path dst, boolean exception, boolean srcExists,
+      boolean dstExists, Rename... options) throws IOException {
+    try {
+      fc.rename(src, dst, options);
+      Assert.assertFalse("Expected exception is not thrown", exception);
+    } catch (Exception e) {
+      LOG.warn("Exception ", e);
+      Assert.assertTrue(exception);
+    }
+    Assert.assertEquals(srcExists, fc.exists(src));
+    Assert.assertEquals(dstExists, fc.exists(dst));
+  }
+}

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj?rev=829243&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj Fri Oct 23 21:21:14 2009
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.TestRename;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+
+/**
+ * The aspects here are used for testing HDFS implementation of rename
+ * functionality. Failure is introduced during rename to test the atomicity of
+ * rename.
+ */
+public privileged aspect RenameAspects {
+  public static final Log LOG = LogFactory.getLog(RenameAspects.class);
+
+  /** When removeChild is called during rename, throw exception */
+  pointcut callRemove(INode[] inodes, int pos) : 
+    call(* FSDirectory.removeChild(INode[], int))
+    && args(inodes, pos)
+    && withincode (* FSDirectory.unprotectedRenameTo(String, 
+        String, long, Rename...));
+
+  before(INode[] inodes, int pos) throws RuntimeException :
+    callRemove(inodes, pos) {
+    LOG.info("FI: callRenameRemove");
+    if (TestRename.throwExceptionOnRemove(inodes[pos].getLocalName())) {
+      throw new RuntimeException("RenameAspects - on remove " + 
+          inodes[pos].getLocalName());
+    }
+  }
+
+  /** When addChildNoQuotaCheck is called during rename, throw exception */
+  pointcut callAddChildNoQuotaCheck(INode[] inodes, int pos, INode node, long diskspace, boolean flag) :
+    call(* FSDirectory.addChildNoQuotaCheck(INode[], int, INode, long, boolean)) 
+    && args(inodes, pos, node, diskspace, flag)
+    && withincode (* FSDirectory.unprotectedRenameTo(String, 
+        String, long, Rename...));
+
+  before(INode[] inodes, int pos, INode node, long diskspace, boolean flag)
+      throws RuntimeException : 
+      callAddChildNoQuotaCheck(inodes, pos, node, diskspace, flag) {
+    LOG.info("FI: callAddChildNoQuotaCheck");
+    if (TestRename.throwExceptionOnAdd(inodes[pos].getLocalName())) {
+      throw new RuntimeException("RenameAspects on add " + 
+          inodes[pos].getLocalName());
+    }
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java Fri Oct 23 21:21:14 2009
@@ -22,11 +22,12 @@
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -73,53 +74,130 @@
   } 
   
   @Override
+   protected IOException unwrapException(IOException e) {
+    if (e instanceof RemoteException) {
+      return ((RemoteException) e).unwrapRemoteException();
+    }
+    return e;
+  }
+  
   @Test
-  public void testRenameFileAsExistingFile() throws Exception {
-    // ignore base class test till hadoop-6240 is fixed
+  public void testOldRenameWithQuota() throws Exception {
+    DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+    Path src1 = getTestRootPath("test/testOldRenameWithQuota/srcdir/src1");
+    Path src2 = getTestRootPath("test/testOldRenameWithQuota/srcdir/src2");
+    Path dst1 = getTestRootPath("test/testOldRenameWithQuota/dstdir/dst1");
+    Path dst2 = getTestRootPath("test/testOldRenameWithQuota/dstdir/dst2");
+    createFile(src1);
+    createFile(src2);
+    fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
+        FSConstants.QUOTA_DONT_SET);
+    fc.mkdir(dst1.getParent(), FileContext.DEFAULT_PERM, true);
+
+    fs.setQuota(dst1.getParent(), 2, FSConstants.QUOTA_DONT_SET);
+    /* 
+     * Test1: src does not exceed quota and dst has no quota check and hence 
+     * accommodates rename
+     */
+    oldRename(src1, dst1, true, false);
+
+    /*
+     * Test2: src does not exceed quota and dst has *no* quota to accommodate 
+     * rename. 
+     */
+    // dstDir quota = 1 and dst1 already uses it
+    oldRename(src2, dst2, false, true);
+
+    /*
+     * Test3: src exceeds quota and dst has *no* quota to accommodate rename
+     */
+    // src1 has no quota to accommodate new rename node
+    fs.setQuota(src1.getParent(), 1, FSConstants.QUOTA_DONT_SET);
+    oldRename(dst1, src1, false, true);
   }
   
   @Test
   public void testRenameWithQuota() throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-    Path src1 = getTestRootPath("testRenameWithQuota/srcdir/src1");
-    Path src2 = getTestRootPath("testRenameWithQuota/srcdir/src2");
-    Path dst1 = getTestRootPath("testRenameWithQuota/dstdir/dst1");
-    Path dst2 = getTestRootPath("testRenameWithQuota/dstdir/dst2");
+    Path src1 = getTestRootPath("test/testRenameWithQuota/srcdir/src1");
+    Path src2 = getTestRootPath("test/testRenameWithQuota/srcdir/src2");
+    Path dst1 = getTestRootPath("test/testRenameWithQuota/dstdir/dst1");
+    Path dst2 = getTestRootPath("test/testRenameWithQuota/dstdir/dst2");
     createFile(src1);
     createFile(src2);
     fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
         FSConstants.QUOTA_DONT_SET);
     fc.mkdir(dst1.getParent(), FileContext.DEFAULT_PERM, true);
-    fs.setQuota(dst1.getParent(), FSConstants.QUOTA_DONT_SET,
-        FSConstants.QUOTA_DONT_SET);
 
-    // Test1: src does not exceed quota and dst has no quota check and hence 
-    // accommodates rename
-    rename(src1, dst1, true, false);
-
-    // Test2: src does not exceed quota and dst has *no* quota to accommodate
-    // rename. 
-
-    // testRenameWithQuota/dstDir now has quota = 1 and dst1 already uses it
-    fs.setQuota(dst1.getParent(), 1, FSConstants.QUOTA_DONT_SET);
-    rename(src2, dst2, false, true);
+    fs.setQuota(dst1.getParent(), 2, FSConstants.QUOTA_DONT_SET);
+    /* 
+     * Test1: src does not exceed quota and dst has no quota check and hence 
+     * accommodates rename
+     */
+    // rename uses dstdir quota=1
+    rename(src1, dst1, false, true, false, Rename.NONE);
+    // rename reuses dstdir quota=1
+    rename(src2, dst1, true, true, false, Rename.OVERWRITE);
+
+    /*
+     * Test2: src does not exceed quota and dst has *no* quota to accommodate 
+     * rename. 
+     */
+    // dstDir quota = 1 and dst1 already uses it
+    createFile(src2);
+    rename(src2, dst2, false, false, true, Rename.NONE);
 
-    // Test3: src exceeds quota and dst has *no* quota to accommodate rename
+    /*
+     * Test3: src exceeds quota and dst has *no* quota to accommodate rename
+     * rename to a destination that does not exist
+     */
+    // src1 has no quota to accommodate new rename node
+    fs.setQuota(src1.getParent(), 1, FSConstants.QUOTA_DONT_SET);
+    rename(dst1, src1, false, false, true, Rename.NONE);
     
+    /*
+     * Test4: src exceeds quota and dst has *no* quota to accommodate rename
+     * rename to a destination that exists and quota freed by deletion of dst
+     * is same as quota needed by src.
+     */
     // src1 has no quota to accommodate new rename node
+    fs.setQuota(src1.getParent(), 100, FSConstants.QUOTA_DONT_SET);
+    createFile(src1);
     fs.setQuota(src1.getParent(), 1, FSConstants.QUOTA_DONT_SET);
-    rename(dst1, src1, false, true);
+    rename(dst1, src1, true, true, false, Rename.OVERWRITE);
+  }
+  
+  @Test
+  public void testRenameRoot() throws Exception {
+    Path src = getTestRootPath("test/testRenameRoot/srcdir/src1");
+    Path dst = new Path("/");
+    createFile(src);
+    rename(src, dst, true, false, true, Rename.OVERWRITE);
+    rename(dst, src, true, false, true, Rename.OVERWRITE);
   }
   
-  private void rename(Path src, Path dst, boolean renameSucceeds,
-      boolean quotaException) throws Exception {
+  private void oldRename(Path src, Path dst, boolean renameSucceeds,
+      boolean exception) throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
     try {
       Assert.assertEquals(renameSucceeds, fs.rename(src, dst));
-    } catch (QuotaExceededException ex) {
-      Assert.assertTrue(quotaException);
+    } catch (Exception ex) {
+      Assert.assertTrue(exception);
     }
     Assert.assertEquals(renameSucceeds, !fc.exists(src));
     Assert.assertEquals(renameSucceeds, fc.exists(dst));
   }
+  
+  private void rename(Path src, Path dst, boolean dstExists,
+      boolean renameSucceeds, boolean exception, Options.Rename... options)
+      throws Exception {
+    try {
+      fc.rename(src, dst, options);
+      Assert.assertTrue(renameSucceeds);
+    } catch (Exception ex) {
+      Assert.assertTrue(exception);
+    }
+    Assert.assertEquals(renameSucceeds, !fc.exists(src));
+    Assert.assertEquals((dstExists||renameSucceeds), fc.exists(dst));
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Oct 23 21:21:14 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
@@ -173,7 +174,10 @@
 
     public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
 
+    @Deprecated
     public boolean rename(String src, String dst) throws IOException { return false; }
+    
+    public void rename(String src, String dst, Rename... options) throws IOException { }
 
     public boolean delete(String src) throws IOException { return false; }
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=829243&r1=829242&r2=829243&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Fri Oct 23 21:21:14 2009
@@ -567,6 +567,7 @@
       super.parseArguments(args);
     }
 
+    @SuppressWarnings("deprecation")
     void generateInputs(int[] opsPerThread) throws IOException {
       // create files using opsPerThread
       String[] createArgs = new String[] {
@@ -668,6 +669,7 @@
       }
     }
 
+    @SuppressWarnings("deprecation")
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = System.currentTimeMillis();