You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/12/17 04:40:30 UTC

svn commit: r1422748 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ s...

Author: szetszwo
Date: Mon Dec 17 03:40:27 2012
New Revision: 1422748

URL: http://svn.apache.org/viewvc?rev=1422748&view=rev
Log:
HDFS-4317. Change INode and its subclasses to support HDFS-4103.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt Mon Dec 17 03:40:27 2012
@@ -83,3 +83,5 @@ Branch-2802 Snapshot (Unreleased)
   structure and modifications. (Jing Zhao via suresh)
 
   HDFS-4293. Fix TestSnapshot failure. (Jing Zhao via suresh)
+
+  HDFS-4317. Change INode and its subclasses to support HDFS-4103. (szetszwo)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Mon Dec 17 03:40:27 2012
@@ -234,9 +234,10 @@ public class DFSUtil {
    * Given a list of path components returns a path as a UTF8 String
    */
   public static String byteArray2PathString(byte[][] pathComponents) {
-    if (pathComponents.length == 0)
+    if (pathComponents.length == 0) {
       return "";
-    if (pathComponents.length == 1 && pathComponents[0].length == 0) {
+    } else if (pathComponents.length == 1
+        && (pathComponents[0] == null || pathComponents[0].length == 0)) {
       return Path.SEPARATOR;
     }
     try {
@@ -249,9 +250,8 @@ public class DFSUtil {
       }
       return result.toString();
     } catch (UnsupportedEncodingException ex) {
-      assert false : "UTF8 encoding is not supported ";
+      throw new AssertionError("UTF-8 encoding is not supported.");
     }
-    return null;
   }
 
   /** Convert an object representing a path to a string. */

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Dec 17 03:40:27 2012
@@ -383,7 +383,7 @@ public class FSDirectory implements Clos
     writeLock();
     try {
       // file is closed
-      file.setModificationTimeForce(now);
+      file.setModificationTime(now);
       fsImage.getEditLog().logCloseFile(path, file);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
@@ -585,8 +585,8 @@ public class FSDirectory implements Clos
               + src + " is renamed to " + dst);
         }
         // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
-        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
+        srcInodes[srcInodes.length-2].updateModificationTime(timestamp);
+        dstInodes[dstInodes.length-2].updateModificationTime(timestamp);
         // update moved leases with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);        
         return true;
@@ -752,8 +752,8 @@ public class FSDirectory implements Clos
               "DIR* FSDirectory.unprotectedRenameTo: " + src
               + " is renamed to " + dst);
         }
-        srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
-        dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+        srcInodes[srcInodes.length - 2].updateModificationTime(timestamp);
+        dstInodes[dstInodes.length - 2].updateModificationTime(timestamp);
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
 
@@ -994,8 +994,8 @@ public class FSDirectory implements Clos
       count++;
     }
     
-    trgInode.setModificationTimeForce(timestamp);
-    trgParent.setModificationTime(timestamp);
+    trgInode.setModificationTime(timestamp);
+    trgParent.updateModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
   }
@@ -1133,7 +1133,7 @@ public class FSDirectory implements Clos
       return 0;
     }
     // set the parent's modification time
-    inodes[inodes.length - 2].setModificationTime(mtime);
+    inodes[inodes.length - 2].updateModificationTime(mtime);
     int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
@@ -1167,35 +1167,14 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Replaces the specified INode.
-   */
-  private void replaceINodeUnsynced(String path, INode oldnode, INode newnode
-      ) throws IOException {    
-    //remove the old node from the namespace 
-    if (!oldnode.removeNode()) {
-      final String mess = "FSDirectory.replaceINodeUnsynced: failed to remove "
-          + path;
-      NameNode.stateChangeLog.warn("DIR* " + mess);
-      throw new IOException(mess);
-    } 
-    
-    //add the new node
-    rootDir.addINode(path, newnode); 
-  }
-
-  /**
    * Replaces the specified INodeDirectory.
    */
   public void replaceINodeDirectory(String path, INodeDirectory oldnode,
       INodeDirectory newnode) throws IOException {    
     writeLock();
     try {
-      replaceINodeUnsynced(path, oldnode, newnode);
-
-      //update children's parent directory
-      for(INode i : newnode.getChildrenList(null)) {
-        i.parent = newnode;
-      }
+      unprotectedReplaceINode(path, oldnode, newnode);
+      // Note that the parent of the children of the oldnode is already updated
     } finally {
       writeUnlock();
     }
@@ -1204,32 +1183,44 @@ public class FSDirectory implements Clos
   /**
    * Replaces the specified INodeFile with the specified one.
    */
-  public void replaceNode(String path, INodeFile oldnode, INodeFile newnode
-      ) throws IOException {    
+  public void replaceINodeFile(String path, INodeFile oldnode,
+      INodeFile newnode) throws IOException {    
     writeLock();
     try {
-      unprotectedReplaceNode(path, oldnode, newnode);
+      unprotectedReplaceINodeFile(path, oldnode, newnode);
     } finally {
       writeUnlock();
     }
   }
-  
-  void unprotectedReplaceNode(String path, INodeFile oldnode, INodeFile newnode)
-      throws IOException, UnresolvedLinkException {
-    assert hasWriteLock();
-    INodeDirectory parent = oldnode.parent;
+
+  private void unprotectedReplaceINode(String path, INode oldnode,
+      INode newnode) throws IOException {    
+    Preconditions.checkState(hasWriteLock());
+
+    INodeDirectory parent = oldnode.getParent();
     // Remove the node from the namespace 
-    if (!oldnode.removeNode()) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
-                                   "failed to remove " + path);
-      throw new IOException("FSDirectory.replaceNode: " +
-                            "failed to remove " + path);
-    } 
+    if (parent == null) {
+      final String mess
+          = "FSDirectory.unprotectedReplaceINode: failed to remove " + path;
+      NameNode.stateChangeLog.warn("DIR* " + mess);
+      throw new IOException(mess);
+    }
     
-    // Parent should be non-null, otherwise oldnode.removeNode() will return
-    // false
-    newnode.setLocalName(oldnode.getLocalNameBytes());
+    final INode removed = parent.removeChild(oldnode);
+    Preconditions.checkState(removed == oldnode,
+        "removed != oldnode=%s, removed=%s", oldnode, removed);
+
+    parent = oldnode.getParent();
+    oldnode.setParent(null);
     parent.addChild(newnode, true);
+
+  }
+
+  void unprotectedReplaceINodeFile(String path, INodeFile oldnode,
+      INodeFile newnode)
+      throws IOException, UnresolvedLinkException {
+    unprotectedReplaceINode(path, oldnode, newnode);
+    newnode.setLocalName(oldnode.getLocalNameBytes());
     
     /* Currently oldnode and newnode are assumed to contain the same
      * blocks. Otherwise, blocks need to be removed from the blocksMap.
@@ -1326,9 +1317,16 @@ public class FSDirectory implements Clos
    * Get {@link INode} associated with the file / directory.
    */
   public INode getINode(String src) throws UnresolvedLinkException {
+    return getINodesInPath(src).getINode(0);
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   */
+  public INodesInPath getINodesInPath(String src) throws UnresolvedLinkException {
     readLock();
     try {
-      return rootDir.getNode(src, true);
+      return rootDir.getINodesInPath(src, true);
     } finally {
       readUnlock();
     }
@@ -1336,6 +1334,19 @@ public class FSDirectory implements Clos
   
   /**
    * Get {@link INode} associated with the file / directory.
+   */
+  public INodesInPath getMutableINodesInPath(String src
+      ) throws UnresolvedLinkException, SnapshotAccessControlException {
+    readLock();
+    try {
+      return rootDir.getMutableINodesInPath(src, true);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
   public INode getMutableINode(String src) throws UnresolvedLinkException,
@@ -2015,8 +2026,8 @@ public class FSDirectory implements Clos
         }
       } else {
         // a non-quota directory; so replace it with a directory with quota
-        INodeDirectoryWithQuota newNode = 
-          new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
+        final INodeDirectoryWithQuota newNode = new INodeDirectoryWithQuota(
+            dirNode, true, nsQuota, dsQuota);
         // non-root directory node; parent != null
         INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
         dirNode = newNode;
@@ -2084,7 +2095,7 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
-      inode.setModificationTimeForce(mtime);
+      inode.setModificationTime(mtime);
       status = true;
     }
     if (atime != -1) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Dec 17 03:40:27 2012
@@ -281,7 +281,7 @@ public class FSEditLogLoader {
       
       // Update the salient file attributes.
       newFile.setAccessTime(addCloseOp.atime);
-      newFile.setModificationTimeForce(addCloseOp.mtime);
+      newFile.setModificationTime(addCloseOp.mtime);
       updateBlocks(fsDir, addCloseOp, newFile);
       break;
     }
@@ -303,7 +303,7 @@ public class FSEditLogLoader {
       
       // Update the salient file attributes.
       oldFile.setAccessTime(addCloseOp.atime);
-      oldFile.setModificationTimeForce(addCloseOp.mtime);
+      oldFile.setModificationTime(addCloseOp.mtime);
       updateBlocks(fsDir, addCloseOp, oldFile);
 
       // Now close the file
@@ -320,8 +320,8 @@ public class FSEditLogLoader {
       if (oldFile.isUnderConstruction()) {
         INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
         fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
-        INodeFile newFile = ucFile.convertToInodeFile();
-        fsDir.unprotectedReplaceNode(addCloseOp.path, ucFile, newFile);
+        INodeFile newFile = ucFile.convertToInodeFile(ucFile.getModificationTime());
+        fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile);
       }
       break;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Dec 17 03:40:27 2012
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -36,6 +37,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotDirectoryException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -44,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
@@ -202,7 +206,7 @@ class FSImageFormat {
     if (nsQuota != -1 || dsQuota != -1) {
       fsDir.rootDir.setQuota(nsQuota, dsQuota);
     }
-    fsDir.rootDir.setModificationTime(root.getModificationTime());
+    fsDir.rootDir.cloneModificationTime(root);
     fsDir.rootDir.clonePermissionStatus(root);    
   }
 
@@ -288,7 +292,7 @@ class FSImageFormat {
       }
       // check if the new inode belongs to the same parent
       if(!isParent(pathComponents, parentPath)) {
-        parentINode = fsDir.rootDir.getParent(pathComponents);
+        parentINode = getParentINodeDirectory(pathComponents);
         parentPath = getParent(pathComponents);
       }
 
@@ -298,12 +302,24 @@ class FSImageFormat {
     }
   }
 
+  private INodeDirectory getParentINodeDirectory(byte[][] pathComponents
+      ) throws FileNotFoundException, PathIsNotDirectoryException,
+      UnresolvedLinkException {
+    if (pathComponents.length < 2) { // root
+      return null;
+    }
+    // Gets the parent INode
+    final INodesInPath inodes = namesystem.dir.rootDir.getExistingPathINodes(
+        pathComponents, 2, false);
+    return INodeDirectory.valueOf(inodes.getINode(0), pathComponents);
+  }
+
   /**
    * Add the child node to parent and, if child is a file, update block map.
    * This method is only used for image loading so that synchronization,
    * modification time update and space count update are not needed.
    */
-  void addToParent(INodeDirectory parent, INode child) {
+  private void addToParent(INodeDirectory parent, INode child) {
     // NOTE: This does not update space counts for parents
     if (!parent.addChild(child, false)) {
       return;
@@ -389,7 +405,7 @@ class FSImageFormat {
         // verify that file exists in namespace
         String path = cons.getLocalName();
         INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path);
-        fsDir.replaceNode(path, oldnode, cons);
+        fsDir.replaceINodeFile(path, oldnode, cons);
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Dec 17 03:40:27 2012
@@ -1952,7 +1952,7 @@ public class FSNamesystem implements Nam
                                     leaseHolder,
                                     clientMachine,
                                     clientNode);
-    dir.replaceNode(src, file, cons);
+    dir.replaceINodeFile(src, file, cons);
     leaseManager.addLease(cons.getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
@@ -3261,8 +3261,8 @@ public class FSNamesystem implements Nam
 
     // The file is no longer pending.
     // Create permanent INode, update blocks
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
+    INodeFile newFile = pendingFile.convertToInodeFile(now());
+    dir.replaceINodeFile(src, pendingFile, newFile);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Mon Dec 17 03:40:27 2012
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
@@ -46,9 +48,31 @@ import com.google.common.primitives.Sign
  */
 @InterfaceAudience.Private
 public abstract class INode implements Comparable<byte[]> {
+  public static final Log LOG = LogFactory.getLog(INode.class);
+
   static final ReadOnlyList<INode> EMPTY_READ_ONLY_LIST
       = ReadOnlyList.Util.emptyList();
 
+  /** A pair of objects. */
+  public static class Pair<L, R> {
+    public final L left;
+    public final R right;
+
+    public Pair(L left, R right) {
+      this.left = left;
+      this.right = right;
+    }
+  }
+
+  /** A triple of objects. */
+  public static class Triple<L, M, R> extends Pair<L, R> {
+    public final M middle;
+    
+    public Triple(L left, M middle, R right) {
+      super(left, right);
+      this.middle = middle;
+    }
+  }
 
   /** Wrapper of two counters for namespace consumed and diskspace consumed. */
   static class DirCounts {
@@ -154,6 +178,17 @@ public abstract class INode implements C
   }
 
   /**
+   * Create a copy of this inode for snapshot.
+   * 
+   * @return a pair of inodes, where the left inode is the current inode and
+   *         the right inode is the snapshot copy. The current inode usually is
+   *         the same object of this inode. However, in some cases, the inode
+   *         may be replaced with a new inode for maintaining snapshot data.
+   *         Then, the current inode is the new inode.
+   */
+  public abstract Pair<? extends INode, ? extends INode> createSnapshotCopy();
+
+  /**
    * Check whether this is the root inode.
    */
   boolean isRoot() {
@@ -246,11 +281,11 @@ public abstract class INode implements C
    * Get the quota set for this inode
    * @return the quota if it is set; -1 otherwise
    */
-  long getNsQuota() {
+  public long getNsQuota() {
     return -1;
   }
 
-  long getDsQuota() {
+  public long getDsQuota() {
     return -1;
   }
   
@@ -307,9 +342,24 @@ public abstract class INode implements C
 
   @Override
   public String toString() {
-    return "\"" + getFullPathName() + "\":"
-    + getUserName() + ":" + getGroupName() + ":"
-    + (isDirectory()? "d": "-") + getFsPermission();
+    return name == null? "<name==null>": getFullPathName();
+  }
+
+  @VisibleForTesting
+  public String getObjectString() {
+    final String s = super.toString();
+    return s.substring(s.lastIndexOf(getClass().getSimpleName()));
+  }
+
+  @VisibleForTesting
+  public String toStringWithObjectType() {
+    return toString() + "(" + getObjectString() + ")";
+  }
+
+  @VisibleForTesting
+  public String toDetailString() {
+    return toStringWithObjectType()
+        + ", parent=" + (parent == null? null: parent.toStringWithObjectType());
   }
 
   /**
@@ -333,20 +383,22 @@ public abstract class INode implements C
     return this.modificationTime;
   }
 
-  /**
-   * Set last modification time of inode.
-   */
-  public void setModificationTime(long modtime) {
+  /** Update modification time if it is larger than the current value. */
+  public void updateModificationTime(long modtime) {
     assert isDirectory();
     if (this.modificationTime <= modtime) {
       this.modificationTime = modtime;
     }
   }
 
+  void cloneModificationTime(INode that) {
+    this.modificationTime = that.modificationTime;
+  }
+
   /**
    * Always set the last modification time of inode.
    */
-  void setModificationTimeForce(long modtime) {
+  void setModificationTime(long modtime) {
     this.modificationTime = modtime;
   }
 
@@ -525,9 +577,9 @@ public abstract class INode implements C
     out.print(" ");
     out.print(getLocalName());
     out.print("   (");
-    final String s = super.toString();
-    out.print(s.substring(s.lastIndexOf(getClass().getSimpleName())));
-    out.println(")");
+    out.print(getObjectString());
+    out.print("), parent=");
+    out.println(parent == null? null: parent.getLocalName());
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Dec 17 03:40:27 2012
@@ -32,11 +32,13 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Directory INode class.
@@ -88,6 +90,12 @@ public class INodeDirectory extends INod
       }
     }
   }
+
+  @Override
+  public Pair<INodeDirectory, INodeDirectory> createSnapshotCopy() {
+    return new Pair<INodeDirectory, INodeDirectory>(this,
+        new INodeDirectory(this, false));
+  }
   
   /** @return true unconditionally. */
   @Override
@@ -106,13 +114,27 @@ public class INodeDirectory extends INod
     }
   }
 
-  private int searchChildren(INode inode) {
-    return Collections.binarySearch(children, inode.getLocalNameBytes());
+  private int searchChildren(byte[] name) {
+    return Collections.binarySearch(children, name);
+  }
+
+  protected int searchChildrenForExistingINode(byte[] name) {
+    assertChildrenNonNull();
+    final int i = searchChildren(name);
+    if (i < 0) {
+      throw new AssertionError("Child not found: name="
+          + DFSUtil.bytes2String(name));
+    }
+    return i;
+  }
+  
+  protected INode getExistingChild(int i) {
+    return children.get(i);
   }
 
   INode removeChild(INode node) {
     assertChildrenNonNull();
-    final int i = searchChildren(node);
+    final int i = searchChildren(node.getLocalNameBytes());
     return i >= 0? children.remove(i): null;
   }
 
@@ -123,7 +145,7 @@ public class INodeDirectory extends INod
   void replaceChild(INode newChild) {
     assertChildrenNonNull();
 
-    final int low = searchChildren(newChild);
+    final int low = searchChildren(newChild.getLocalNameBytes());
     if (low>=0) { // an old child exists so replace by the newChild
       children.get(low).parent = null;
       children.set(low, newChild);
@@ -132,6 +154,18 @@ public class INodeDirectory extends INod
     }
   }
 
+  /** Replace a child {@link INodeFile} with an {@link INodeFileWithLink}. */
+  INodeFileWithLink replaceINodeFile(final INodeFile child) {
+    assertChildrenNonNull();
+    Preconditions.checkArgument(!(child instanceof INodeFileWithLink),
+        "Child file is already an INodeFileWithLink, child=" + child);
+
+    final INodeFileWithLink newChild = new INodeFileWithLink(child);
+    final int i = searchChildrenForExistingINode(newChild.getLocalNameBytes());
+    children.set(i, newChild);
+    return newChild;
+  }
+
   private INode getChild(byte[] name, Snapshot snapshot) {
     final ReadOnlyList<INode> c = getChildrenList(snapshot);
     final int i = ReadOnlyList.Util.binarySearch(c, name);
@@ -236,7 +270,7 @@ public class INodeDirectory extends INod
         "Incorrect name " + getLocalName() + " expected "
         + (components[0] == null? null: DFSUtil.bytes2String(components[0]));
 
-    INodesInPath existing = new INodesInPath(numOfINodes);
+    INodesInPath existing = new INodesInPath(components, numOfINodes);
     INode curNode = this;
     int count = 0;
     int index = numOfINodes - components.length;
@@ -373,7 +407,7 @@ public class INodeDirectory extends INod
     if (children == null) {
       children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
     }
-    final int low = searchChildren(node);
+    final int low = searchChildren(node.getLocalNameBytes());
     if (low >= 0) {
       return false;
     }
@@ -381,7 +415,7 @@ public class INodeDirectory extends INod
     children.add(-low - 1, node);
     // update modification time of the parent directory
     if (setModTime)
-      setModificationTime(node.getModificationTime());
+      updateModificationTime(node.getModificationTime());
     if (node.getGroupName() == null) {
       node.setGroup(getGroupName());
     }
@@ -408,20 +442,12 @@ public class INodeDirectory extends INod
     }
     newNode.setLocalName(pathComponents[pathComponents.length - 1]);
     // insert into the parent children list
-    INodeDirectory parent = getParent(pathComponents);
+    final INodesInPath iip =  getExistingPathINodes(pathComponents, 2, false);
+    final INodeDirectory parent = INodeDirectory.valueOf(iip.getINode(0),
+        pathComponents);
     return parent.addChild(newNode, true);
   }
 
-  INodeDirectory getParent(byte[][] pathComponents
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
-    if (pathComponents.length < 2)  // add root
-      return null;
-    // Gets the parent INode
-    INodesInPath inodes =  getExistingPathINodes(pathComponents, 2, false);
-    return INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
-  }
-
   @Override
   DirCounts spaceConsumedInTree(DirCounts counts) {
     counts.nsCount += 1;
@@ -465,12 +491,14 @@ public class INodeDirectory extends INod
   }
 
   /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current directory.
    * @return the current children list if the specified snapshot is null;
    *         otherwise, return the children list corresponding to the snapshot.
    *         Note that the returned list is never null.
    */
   public ReadOnlyList<INode> getChildrenList(final Snapshot snapshot) {
-    //TODO: use snapshot to select children list
     return children == null ? EMPTY_READ_ONLY_LIST
         : ReadOnlyList.Util.asReadOnlyList(children);
   }
@@ -499,6 +527,7 @@ public class INodeDirectory extends INod
    * Contains INodes information resolved from a given path.
    */
   static class INodesInPath {
+    private final byte[][] path;
     /**
      * Array with the specified number of INodes resolved for a given path.
      */
@@ -528,7 +557,8 @@ public class INodeDirectory extends INod
      */
     private Snapshot snapshot = null; 
 
-    INodesInPath(int number) {
+    private INodesInPath(byte[][] path, int number) {
+      this.path = path;
       assert (number >= 0);
       inodes = new INode[number];
       capacity = number;
@@ -578,10 +608,15 @@ public class INodeDirectory extends INod
     }
     
     /** @return the i-th inode. */
-    INode getINode(int i) {
+    public INode getINode(int i) {
       return inodes[i];
     }
     
+    /** @return the last inode. */
+    public INode getLastINode() {
+      return inodes[inodes.length - 1];
+    }
+    
     /**
      * @return index of the {@link INodeDirectoryWithSnapshot} in
      *         {@link #inodes} for snapshot path, else -1.
@@ -621,8 +656,17 @@ public class INodeDirectory extends INod
 
     @Override
     public String toString() {
+      return toString(true);
+    }
+
+    private String toString(boolean vaildateObject) {
+      if (vaildateObject) {
+        vaildate();
+      }
+
       final StringBuilder b = new StringBuilder(getClass().getSimpleName())
-          .append(":\n  inodes = ");
+          .append(": path = ").append(DFSUtil.byteArray2PathString(path))
+          .append("\n  inodes = ");
       if (inodes == null) {
         b.append("null");
       } else if (inodes.length == 0) {
@@ -641,6 +685,31 @@ public class INodeDirectory extends INod
        .append("\n  snapshot          = ").append(snapshot);
       return b.toString();
     }
+
+    void vaildate() {
+      // check parent up to snapshotRootIndex or numNonNull
+      final int n = snapshotRootIndex >= 0? snapshotRootIndex + 1: numNonNull;  
+      int i = 0;
+      if (inodes[i] != null) {
+        for(i++; i < n && inodes[i] != null; i++) {
+          final INodeDirectory parent_i = inodes[i].getParent();
+          final INodeDirectory parent_i_1 = inodes[i-1].getParent();
+          if (parent_i != inodes[i-1] &&
+              (parent_i_1 == null || !parent_i_1.isSnapshottable()
+                  || parent_i != parent_i_1)) {
+            throw new AssertionError(
+                "inodes[" + i + "].getParent() != inodes[" + (i-1)
+                + "]\n  inodes[" + i + "]=" + inodes[i].toDetailString()
+                + "\n  inodes[" + (i-1) + "]=" + inodes[i-1].toDetailString()
+                + "\n this=" + toString(false));
+          }
+        }
+      }
+      if (i != n) {
+        throw new AssertionError("i = " + i + " != " + n
+            + ", this=" + toString(false));
+      }
+    }
   }
 
   /*

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Mon Dec 17 03:40:27 2012
@@ -42,9 +42,9 @@ public class INodeDirectoryWithQuota ext
    * @param dsQuota Diskspace quota to be assigned to this indoe
    * @param other The other inode from which all other properties are copied
    */
-  protected INodeDirectoryWithQuota(long nsQuota, long dsQuota,
-      INodeDirectory other) {
-    super(other, true);
+  protected INodeDirectoryWithQuota(INodeDirectory other, boolean adopt,
+      long nsQuota, long dsQuota) {
+    super(other, adopt);
     INode.DirCounts counts = new INode.DirCounts();
     other.spaceConsumedInTree(counts);
     this.nsCount = counts.getNsCount();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Mon Dec 17 03:40:27 2012
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
 
 /** I-node for closed file. */
 @InterfaceAudience.Private
@@ -89,16 +91,27 @@ public class INodeFile extends INode imp
   INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
                       short replication, long modificationTime,
                       long atime, long preferredBlockSize) {
-    super(permissions, modificationTime, atime);
+    this(null, permissions, modificationTime, atime, blklist, replication,
+        preferredBlockSize);
+  }
+
+  INodeFile(byte[] name, PermissionStatus permissions, long mtime, long atime,
+      BlockInfo[] blklist, short replication, long preferredBlockSize) {
+    super(name, permissions, null, mtime, atime);
     header = HeaderFormat.combineReplication(header, replication);
     header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
     this.blocks = blklist;
   }
 
-  protected INodeFile(INodeFile f) {
-    this(f.getPermissionStatus(), f.getBlocks(), f.getFileReplication(),
-        f.getModificationTime(), f.getAccessTime(), f.getPreferredBlockSize());
-    this.setLocalName(f.getLocalNameBytes());
+  protected INodeFile(INodeFile that) {
+    super(that);
+    this.header = that.header;
+    this.blocks = that.blocks;
+  }
+
+  @Override
+  public Pair<INodeFileWithLink, INodeFileSnapshot> createSnapshotCopy() {
+    return parent.replaceINodeFile(this).createSnapshotCopy();
   }
 
   /** @return true unconditionally. */

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Mon Dec 17 03:40:27 2012
@@ -72,9 +72,8 @@ public class INodeFileUnderConstruction 
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    super(perm, blocks, blockReplication, modificationTime, modificationTime,
-          preferredBlockSize);
-    setLocalName(name);
+    super(name, perm, modificationTime, modificationTime,
+        blocks, blockReplication, preferredBlockSize);
     this.clientName = clientName;
     this.clientMachine = clientMachine;
     this.clientNode = clientNode;
@@ -108,19 +107,14 @@ public class INodeFileUnderConstruction 
   // converts a INodeFileUnderConstruction into a INodeFile
   // use the modification time as the access time
   //
-  INodeFile convertToInodeFile() {
+  INodeFile convertToInodeFile(long mtime) {
     assert allBlocksComplete() : "Can't finalize inode " + this
       + " since it contains non-complete blocks! Blocks are "
       + Arrays.asList(getBlocks());
     //TODO SNAPSHOT: may convert to INodeFileWithLink
-    INodeFile obj = new INodeFile(getPermissionStatus(),
-                                  getBlocks(),
-                                  getFileReplication(),
-                                  getModificationTime(),
-                                  getModificationTime(),
-                                  getPreferredBlockSize());
-    return obj;
-    
+    return new INodeFile(getLocalNameBytes(), getPermissionStatus(),
+        mtime, getModificationTime(),
+        getBlocks(), getFileReplication(), getPreferredBlockSize());
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Mon Dec 17 03:40:27 2012
@@ -43,6 +43,12 @@ public class INodeSymlink extends INode 
   }
 
   @Override
+  public Pair<INodeSymlink, INodeSymlink> createSnapshotCopy() {
+    return new Pair<INodeSymlink, INodeSymlink>(this, new INodeSymlink(this));
+  }
+
+  /** @return true unconditionally. */
+  @Override
   public boolean isSymlink() {
     return true;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Mon Dec 17 03:40:27 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -41,18 +40,10 @@ import com.google.common.annotations.Vis
  * by the namesystem and FSDirectory locks.
  */
 @InterfaceAudience.Private
-public class INodeDirectorySnapshottable extends INodeDirectoryWithQuota {
+public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
   static public INodeDirectorySnapshottable newInstance(
       final INodeDirectory dir, final int snapshotQuota) {
-    long nsq = -1L;
-    long dsq = -1L;
-
-    if (dir instanceof INodeDirectoryWithQuota) {
-      final INodeDirectoryWithQuota q = (INodeDirectoryWithQuota)dir;
-      nsq = q.getNsQuota();
-      dsq = q.getDsQuota();
-    }
-    return new INodeDirectorySnapshottable(nsq, dsq, dir, snapshotQuota);
+    return new INodeDirectorySnapshottable(dir, snapshotQuota);
   }
 
   /** Cast INode to INodeDirectorySnapshottable. */
@@ -90,9 +81,9 @@ public class INodeDirectorySnapshottable
   /** Number of snapshots allowed. */
   private int snapshotQuota;
 
-  private INodeDirectorySnapshottable(long nsQuota, long dsQuota,
-      INodeDirectory dir, final int snapshotQuota) {
-    super(nsQuota, dsQuota, dir);
+  private INodeDirectorySnapshottable(INodeDirectory dir,
+      final int snapshotQuota) {
+    super(dir, true);
     setSnapshotQuota(snapshotQuota);
   }
   
@@ -196,8 +187,8 @@ public class INodeDirectorySnapshottable
 
     //set modification time
     final long timestamp = Time.now();
-    s.getRoot().setModificationTime(timestamp);
-    setModificationTime(timestamp);
+    s.getRoot().updateModificationTime(timestamp);
+    updateModificationTime(timestamp);
     return s;
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Mon Dec 17 03:40:27 2012
@@ -23,9 +23,12 @@ import java.util.List;
 
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
+
+import com.google.common.base.Preconditions;
 
 /** The directory with snapshots. */
-public class INodeDirectoryWithSnapshot extends INodeDirectory {
+public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   /**
    * The difference between the current state and a previous snapshot
    * of an INodeDirectory.
@@ -79,6 +82,13 @@ public class INodeDirectoryWithSnapshot 
       return inodes == null? -1: Collections.binarySearch(inodes, name);
     }
 
+    private static void remove(final List<INode> inodes, final int i,
+        final INode expected) {
+      final INode removed = inodes.remove(-i - 1);
+      Preconditions.checkState(removed == expected,
+          "removed != expected=%s, removed=%s.", expected, removed);
+    }
+
     /** c-list: inode(s) created in current. */
     private List<INode> created;
     /** d-list: inode(s) deleted from current. */
@@ -116,82 +126,126 @@ public class INodeDirectoryWithSnapshot 
       deleted.add(-i - 1, inode);
     }
 
-    /** Create an inode in current state. */
-    void create(final INode inode) {
+    /**
+     * Create an inode in current state.
+     * @return the c-list insertion point for undo.
+     */
+    int create(final INode inode) {
       final int c = search(created, inode);
       insertCreated(inode, c);
+      return c;
     }
 
-    /** Delete an inode from current state. */
-    void delete(final INode inode) {
+    void undoCreate(final INode inode, final int insertionPoint) {
+      remove(created, insertionPoint, inode);
+    }
+
+    /**
+     * Delete an inode from current state.
+     * @return a triple for undo.
+     */
+    Triple<Integer, INode, Integer> delete(final INode inode) {
       final int c = search(created, inode);
+      INode previous = null;
+      Integer d = null;
       if (c >= 0) {
         // remove a newly created inode
-        created.remove(c);
+        previous = created.remove(c);
       } else {
         // not in c-list, it must be in previous
-        final int d = search(deleted, inode);
+        d = search(deleted, inode);
         insertDeleted(inode, d);
       }
+      return new Triple<Integer, INode, Integer>(c, previous, d);
+    }
+    
+    void undoDelete(final INode inode,
+        final Triple<Integer, INode, Integer> undoInfo) {
+      final int c = undoInfo.left;
+      if (c >= 0) {
+        created.add(c, undoInfo.middle);
+      } else {
+        remove(deleted, undoInfo.right, inode);
+      }
     }
 
-    /** Modify an inode in current state. */
-    void modify(final INode oldinode, final INode newinode) {
+    /**
+     * Modify an inode in current state.
+     * @return a triple for undo.
+     */
+    Triple<Integer, INode, Integer> modify(final INode oldinode, final INode newinode) {
       if (!oldinode.equals(newinode)) {
         throw new AssertionError("The names do not match: oldinode="
             + oldinode + ", newinode=" + newinode);
       }
       final int c = search(created, newinode);
+      INode previous = null;
+      Integer d = null;
       if (c >= 0) {
         // inode is already in c-list,
-        created.set(c, newinode);
+        previous = created.set(c, newinode);
       } else {
-        final int d = search(deleted, oldinode);
+        d = search(deleted, oldinode);
         if (d < 0) {
           // neither in c-list nor d-list
           insertCreated(newinode, c);
           insertDeleted(oldinode, d);
         }
       }
+      return new Triple<Integer, INode, Integer>(c, previous, d);
+    }
+
+    void undoModify(final INode oldinode, final INode newinode,
+        final Triple<Integer, INode, Integer> undoInfo) {
+      final int c = undoInfo.left;
+      if (c >= 0) {
+        created.set(c, undoInfo.middle);
+      } else {
+        final int d = undoInfo.right;
+        if (d < 0) {
+          remove(created, c, newinode);
+          remove(deleted, d, oldinode);
+        }
+      }
     }
 
     /**
-     * Given an inode in current state, find the corresponding inode in previous
-     * snapshot. The inodes in current state and previous snapshot can possibly
-     * be the same.
-     *
-     * @param inodeInCurrent The inode, possibly null, in current state.
-     * @return null if the inode is not found in previous snapshot;
-     *         otherwise, return the corresponding inode in previous snapshot.
+     * Find an inode in the previous snapshot.
+     * @return null if the inode cannot be determined in the previous snapshot
+     *         since no change is recorded and it should be determined in the
+     *         current snapshot; otherwise, return an array with size one
+     *         containing the inode in the previous snapshot. Note that the
+     *         inode can possibly be null which means that the inode is not
+     *         found in the previous snapshot.
      */
-    INode accessPrevious(byte[] name, INode inodeInCurrent) {
-      return accessPrevious(name, inodeInCurrent, created, deleted);
+    INode[] accessPrevious(byte[] name) {
+      return accessPrevious(name, created, deleted);
     }
 
-    private static INode accessPrevious(byte[] name, INode inodeInCurrent,
+    private static INode[] accessPrevious(byte[] name,
         final List<INode> clist, final List<INode> dlist) {
       final int d = search(dlist, name);
       if (d >= 0) {
         // the inode was in previous and was once deleted in current.
-        return dlist.get(d);
+        return new INode[]{dlist.get(d)};
       } else {
         final int c = search(clist, name);
         // When c >= 0, the inode in current is a newly created inode.
-        return c >= 0? null: inodeInCurrent;
+        return c >= 0? new INode[]{null}: null;
       }
     }
 
     /**
-     * Given an inode in previous snapshot, find the corresponding inode in
-     * current state. The inodes in current state and previous snapshot can
-     * possibly be the same.
-     *
-     * @param inodeInPrevious The inode, possibly null, in previous snapshot.
-     * @return null if the inode is not found in current state;
-     *         otherwise, return the corresponding inode in current state.
+     * Find an inode in the current snapshot.
+     * @return null if the inode cannot be determined in the current snapshot
+     *         since no change is recorded and it should be determined in the
+     *         previous snapshot; otherwise, return an array with size one
+     *         containing the inode in the current snapshot. Note that the
+     *         inode can possibly be null which means that the inode is not
+     *         found in the current snapshot.
      */
-    INode accessCurrent(byte[] name, INode inodeInPrevious) {
-      return accessPrevious(name, inodeInPrevious, deleted, created);
+    INode[] accessCurrent(byte[] name) {
+      return accessPrevious(name, deleted, created);
     }
 
     /**
@@ -230,15 +284,13 @@ public class INodeDirectoryWithSnapshot 
 
     /** Convert the inode list to a compact string. */
     static String toString(List<INode> inodes) {
-      if (inodes == null) {
-        return null;
-      } else if (inodes.isEmpty()) {
-        return "[]";
+      if (inodes == null || inodes.isEmpty()) {
+        return "<empty>";
       }
       final StringBuilder b = new StringBuilder("[")
-          .append(inodes.get(0).getLocalName());
+          .append(inodes.get(0));
       for(int i = 1; i < inodes.size(); i++) {
-        b.append(", ").append(inodes.get(i).getLocalName());
+        b.append(", ").append(inodes.get(i));
       }
       return b.append("]").toString();
     }
@@ -246,13 +298,12 @@ public class INodeDirectoryWithSnapshot 
     @Override
     public String toString() {
       return getClass().getSimpleName()
-          + ":\n  created=" + toString(created)
-          + "\n  deleted=" + toString(deleted);
+          + "{created=" + toString(created)
+          + ", deleted=" + toString(deleted) + "}";
     }
   }
 
-  INodeDirectoryWithSnapshot(String name, INodeDirectory dir) {
-    super(name, dir.getPermissionStatus());
-    parent = dir;
+  public INodeDirectoryWithSnapshot(INodeDirectory that, boolean adopt) {
+    super(that, adopt, that.getNsQuota(), that.getDsQuota());
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileSnapshot.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileSnapshot.java Mon Dec 17 03:40:27 2012
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 
 /**
  *  INode representing a snapshot of a file.
@@ -28,9 +27,10 @@ public class INodeFileSnapshot extends I
   /** The file size at snapshot creation time. */
   final long size;
 
-  public INodeFileSnapshot(INodeFile f, long size) {
+  INodeFileSnapshot(INodeFileWithLink f) {
     super(f);
-    this.size = size;
+    this.size = f.computeFileSize(true);
+    f.insert(this);
   }
 
   @Override
@@ -39,4 +39,4 @@ public class INodeFileSnapshot extends I
     //since files in a snapshot are considered as closed.
     return size;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java Mon Dec 17 03:40:27 2012
@@ -37,6 +37,12 @@ public class INodeFileWithLink extends I
     next = this;
   }
 
+  @Override
+  public Pair<INodeFileWithLink, INodeFileSnapshot> createSnapshotCopy() {
+    return new Pair<INodeFileWithLink, INodeFileSnapshot>(this,
+        new INodeFileSnapshot(this));
+  }
+
   void setNext(INodeFileWithLink next) {
     this.next = next;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Mon Dec 17 03:40:27 2012
@@ -44,7 +44,8 @@ public class Snapshot implements Compara
 
   Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
     this.id = id;
-    this.root = new INodeDirectoryWithSnapshot(name, dir);
+    this.root = new INodeDirectoryWithSnapshot(dir, false);
+    this.root.setLocalName(name);
   }
 
   /** @return the root directory of the snapshot. */

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Mon Dec 17 03:40:27 2012
@@ -210,9 +210,6 @@ public class SnapshotManager implements 
      */
     private INodeFileSnapshot processINodeFile(final INodeDirectory parent,
         final INodeFile file) {
-      final INodeFileSnapshot snapshot = new INodeFileSnapshot(
-          file, file.computeFileSize(true)); 
-
       final INodeFileWithLink srcWithLink;
       //check source INode type
       if (file instanceof INodeFileWithLink) {
@@ -228,6 +225,7 @@ public class SnapshotManager implements 
       }
       
       //insert the snapshot to src's linked list.
+      final INodeFileSnapshot snapshot = new INodeFileSnapshot(srcWithLink); 
       srcWithLink.insert(snapshot);
       return snapshot;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java Mon Dec 17 03:40:27 2012
@@ -150,6 +150,7 @@ public class SnapshotTestHelper {
       list.add(node);
     }
 
+    int id = 0;
     /**
      * Recursively generate the tree based on the height.
      * 
@@ -162,9 +163,9 @@ public class SnapshotTestHelper {
         return;
       }
       parent.leftChild = new Node(new Path(parent.nodePath,
-          "leftChild"), height - level, parent, fs);
+          "left" + ++id), height - level, parent, fs);
       parent.rightChild = new Node(new Path(parent.nodePath,
-          "rightChild"), height - level, parent, fs);
+          "right" + ++id), height - level, parent, fs);
       addDirNode(parent.leftChild, parent.leftChild.level);
       addDirNode(parent.rightChild, parent.rightChild.level);
       genChildren(parent.leftChild, level - 1);
@@ -254,11 +255,11 @@ public class SnapshotTestHelper {
        * Create files and add them in the fileList. Initially the last element
        * in the fileList is set to null (where we start file creation).
        */
-      void initFileList(long fileLen, short replication, long seed, int numFiles)
+      void initFileList(String namePrefix, long fileLen, short replication, long seed, int numFiles)
           throws Exception {
         fileList = new ArrayList<Path>(numFiles);
         for (int i = 0; i < numFiles; i++) {
-          Path file = new Path(nodePath, "file" + i);
+          Path file = new Path(nodePath, namePrefix + "-f" + i);
           fileList.add(file);
           if (i < numFiles - 1) {
             DFSTestUtil.createFile(fs, file, fileLen, replication, seed);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java Mon Dec 17 03:40:27 2012
@@ -24,6 +24,7 @@ import java.util.Random;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INode.Triple;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.Diff;
 import org.junit.Assert;
@@ -33,7 +34,8 @@ import org.junit.Test;
  * Test {@link INodeDirectoryWithSnapshot}, especially, {@link Diff}.
  */
 public class TestINodeDirectoryWithSnapshot {
-  private static Random RANDOM = new Random();
+  private static final Random RANDOM = new Random();
+  private static final int UNDO_TEST_P = 10;
   private static final PermissionStatus PERM = PermissionStatus.createImmutable(
       "user", "group", FsPermission.createImmutable((short)0));
 
@@ -139,10 +141,14 @@ public class TestINodeDirectoryWithSnaps
       for(int m = 0; m < n; m++) {
         final INode inode = newINode(m, width);
         {// test accessPrevious
-          final int i = Diff.search(current, inode);
-          final INode inodeInCurrent = i < 0? null: current.get(i);
-          final INode computed = diff.accessPrevious(
-              inode.getLocalNameBytes(), inodeInCurrent);
+          final INode[] array = diff.accessPrevious(inode.getLocalNameBytes());
+          final INode computed;
+          if (array != null) {
+            computed = array[0];
+          } else {
+            final int i = Diff.search(current, inode);
+            computed = i < 0? null: current.get(i);
+          }
 
           final int j = Diff.search(previous, inode);
           final INode expected = j < 0? null: previous.get(j);
@@ -151,10 +157,14 @@ public class TestINodeDirectoryWithSnaps
         }
 
         {// test accessCurrent
-          final int i = Diff.search(previous, inode);
-          final INode inodeInPrevious = i < 0? null: previous.get(i);
-          final INode computed = diff.accessCurrent(
-              inode.getLocalNameBytes(), inodeInPrevious);
+          final INode[] array = diff.accessCurrent(inode.getLocalNameBytes());
+          final INode computed;
+          if (array != null) {
+            computed = array[0]; 
+          } else {
+            final int i = Diff.search(previous, inode);
+            computed = i < 0? null: previous.get(i);
+          }
 
           final int j = Diff.search(current, inode);
           final INode expected = j < 0? null: current.get(j);
@@ -182,6 +192,10 @@ public class TestINodeDirectoryWithSnaps
     return true;
   }
 
+  static String toString(Diff diff) {
+    return diff.toString();
+  }
+
   static String toString(INode inode) {
     return inode == null? null
         : inode.getLocalName() + ":" + inode.getModificationTime();
@@ -202,7 +216,24 @@ public class TestINodeDirectoryWithSnaps
     Assert.assertTrue(i < 0);
     current.add(-i - 1, inode);
     if (diff != null) {
-      diff.create(inode);
+      //test undo with 1/UNDO_TEST_P probability
+      final boolean testUndo = RANDOM.nextInt(UNDO_TEST_P) == 0;
+      String before = null;
+      if (testUndo) {
+        before = toString(diff);
+      }
+
+      final int undoInfo = diff.create(inode);
+
+      if (testUndo) {
+        final String after = toString(diff);
+        //undo
+        diff.undoCreate(inode, undoInfo);
+        assertDiff(before, diff);
+        //re-do
+        diff.create(inode);
+        assertDiff(after, diff);
+      }
     }
   }
 
@@ -212,7 +243,24 @@ public class TestINodeDirectoryWithSnaps
         i >= 0);
     current.remove(i);
     if (diff != null) {
-      diff.delete(inode);
+      //test undo with 1/UNDO_TEST_P probability
+      final boolean testUndo = RANDOM.nextInt(UNDO_TEST_P) == 0;
+      String before = null;
+      if (testUndo) {
+        before = toString(diff);
+      }
+
+      final Triple<Integer, INode, Integer> undoInfo = diff.delete(inode);
+
+      if (testUndo) {
+        final String after = toString(diff);
+        //undo
+        diff.undoDelete(inode, undoInfo);
+        assertDiff(before, diff);
+        //re-do
+        diff.delete(inode);
+        assertDiff(after, diff);
+      }
     }
   }
 
@@ -220,14 +268,35 @@ public class TestINodeDirectoryWithSnaps
     final int i = Diff.search(current, inode);
     Assert.assertTrue(i >= 0);
     final INodeDirectory oldinode = (INodeDirectory)current.get(i);
-    final INodeDirectory newinode = new INodeDirectory(oldinode, true);
-    newinode.setModificationTime(oldinode.getModificationTime() + 1);
+    final INodeDirectory newinode = oldinode.createSnapshotCopy().right;
+    newinode.updateModificationTime(oldinode.getModificationTime() + 1);
 
     current.set(i, newinode);
     if (diff != null) {
-      diff.modify(oldinode, newinode);
+      //test undo with 1/UNDO_TEST_P probability
+      final boolean testUndo = RANDOM.nextInt(UNDO_TEST_P) == 0;
+      String before = null;
+      if (testUndo) {
+        before = toString(diff);
+      }
+
+      final Triple<Integer, INode, Integer> undoInfo = diff.modify(oldinode, newinode);
+
+      if (testUndo) {
+        final String after = toString(diff);
+        //undo
+        diff.undoModify(oldinode, newinode, undoInfo);
+        assertDiff(before, diff);
+        //re-do
+        diff.modify(oldinode, newinode);
+        assertDiff(after, diff);
+      }
     }
   }
+  
+  static void assertDiff(String s, Diff diff) {
+    Assert.assertEquals(s, toString(diff));
+  }
 
   /**
    * Test {@link Snapshot#ID_COMPARATOR}.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java?rev=1422748&r1=1422747&r2=1422748&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java Mon Dec 17 03:40:27 2012
@@ -61,7 +61,7 @@ public class TestSnapshot {
   
   protected Configuration conf;
   protected MiniDFSCluster cluster;
-  protected FSNamesystem fsn;
+  protected static FSNamesystem fsn;
   protected DistributedFileSystem hdfs;
 
   private static Random random = new Random(Time.now());
@@ -205,7 +205,7 @@ public class TestSnapshot {
     for (TestDirectoryTree.Node node : nodes) {
       // If the node does not have files in it, create files
       if (node.fileList == null) {
-        node.initFileList(BLOCKSIZE, REPLICATION, seed, 6);
+        node.initFileList(node.nodePath.getName(), BLOCKSIZE, REPLICATION, seed, 6);
       }
       
       //
@@ -250,7 +250,8 @@ public class TestSnapshot {
       
       mList.add(create);
       mList.add(delete);
-      mList.add(append);
+      // TODO: fix append for snapshots
+//      mList.add(append);
       mList.add(chmod);
       mList.add(chown);
       mList.add(replication);
@@ -284,10 +285,10 @@ public class TestSnapshot {
    * Generate a random snapshot name.
    * @return The snapshot name
    */
-  private String genSnapshotName() {
-    return "s" + Math.abs(random.nextLong());
+  static String genSnapshotName() {
+    return String.format("s-%X", random.nextInt());
   }
-  
+
   /**
    * Base class to present changes applied to current file/dir. A modification
    * can be file creation, deletion, or other modifications such as appending on
@@ -496,6 +497,8 @@ public class TestSnapshot {
 
     @Override
     void modify() throws Exception {
+      System.out.println("BEFORE create " + file + "\n"
+              + fsn.getFSDirectory().getINode("/").dumpTreeRecursively());
       DFSTestUtil.createFile(fs, file, fileLen, fileLen, BLOCKSIZE,
           REPLICATION, seed);
     }
@@ -506,11 +509,12 @@ public class TestSnapshot {
         Path snapshotFile = SnapshotTestHelper.getSnapshotFile(fs,
             snapshotRoot, file);
         if (snapshotFile != null) {
-          boolean currentSnapshotFileExist = fs.exists(snapshotFile);
-          boolean originalSnapshotFileExist = 
-              fileStatusMap.get(snapshotFile) != null;
-          assertEquals(currentSnapshotFileExist, originalSnapshotFileExist);
-          if (currentSnapshotFileExist) {
+          boolean computed = fs.exists(snapshotFile);
+          boolean expected = fileStatusMap.get(snapshotFile) != null;
+          assertEquals("snapshotFile=" + snapshotFile + "\n"
+              + fsn.getFSDirectory().getINode("/").dumpTreeRecursively(),
+              expected, computed);
+          if (computed) {
             FileStatus currentSnapshotStatus = fs.getFileStatus(snapshotFile);
             FileStatus originalStatus = fileStatusMap.get(snapshotFile);
             // We compare the string because it contains all the information,
@@ -615,7 +619,7 @@ public class TestSnapshot {
         TestDirectoryTree.Node newChild = new TestDirectoryTree.Node(
             changedPath, node.level + 1, node, node.fs);
         // create file under the new non-snapshottable directory
-        newChild.initFileList(BLOCKSIZE, REPLICATION, seed, 2);
+        newChild.initFileList(node.nodePath.getName(), BLOCKSIZE, REPLICATION, seed, 2);
         node.nonSnapshotChildren.add(newChild);
       } else {
         // deletion