You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/04/24 22:31:07 UTC

svn commit: r1471647 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ src/test/java/org/apache/hadoop/hdfs/se...

Author: szetszwo
Date: Wed Apr 24 20:31:06 2013
New Revision: 1471647

URL: http://svn.apache.org/r1471647
Log:
HDFS-4686. Update quota computation for rename and INodeReference.  Contributed by Jing Zhao

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt Wed Apr 24 20:31:06 2013
@@ -293,3 +293,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4738. Changes AbstractINodeDiff to implement Comparable<Integer>, and
   fix javadoc and other warnings.  (szetszwo)
+
+  HDFS-4686. Update quota computation for rename and INodeReference.
+  (Jing Zhao via szetszwo)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Apr 24 20:31:06 2013
@@ -614,14 +614,18 @@ public class FSDirectory implements Clos
     
     // check srcChild for reference
     final INodeReference.WithCount withCount;
+    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();
     int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
         .getDstSnapshotId() : Snapshot.INVALID_ID;
     if (isSrcInSnapshot) {
       final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
-          .replaceChild4ReferenceWithName(srcChild); 
+          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshot()); 
       withCount = (INodeReference.WithCount) withName.getReferredINode();
       srcChild = withName;
       srcIIP.setLastINode(srcChild);
+      // get the counts before rename
+      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true,
+          Snapshot.INVALID_ID);
     } else if (srcChildIsReference) {
       // srcChild is reference but srcChild is not in latest snapshot
       withCount = (WithCount) srcChild.asReference().getReferredINode();
@@ -661,8 +665,6 @@ public class FSDirectory implements Clos
         final INodeReference.DstReference ref = new INodeReference.DstReference(
             dstParent.asDirectory(), withCount,
             dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
-        withCount.setParentReference(ref);
-        withCount.incrementReferenceCount();
         toDst = ref;
       }
       
@@ -677,8 +679,18 @@ public class FSDirectory implements Clos
         srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
         dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved leases with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);        
+        getFSNamesystem().unprotectedChangeLease(src, dst);     
 
+        // update the quota usage in src tree
+        if (isSrcInSnapshot) {
+          // get the counts after rename
+          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+              Quota.Counts.newInstance(), false, Snapshot.INVALID_ID);
+          newSrcCounts.subtract(oldSrcCounts);
+          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+              newSrcCounts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID);
+        }
+        
         return true;
       }
     } finally {
@@ -689,16 +701,21 @@ public class FSDirectory implements Clos
         if (withCount == null) {
           srcChild.setLocalName(srcChildName);
         } else if (!srcChildIsReference) { // src must be in snapshot
+          // the withCount node will no longer be used thus no need to update
+          // its reference number here
           final INode originalChild = withCount.getReferredINode();
           srcChild = originalChild;
         } else {
+          withCount.removeReference(oldSrcChild.asReference());
           final INodeReference originalRef = new INodeReference.DstReference(
               srcParent, withCount, srcRefDstSnapshot);
-          withCount.setParentReference(originalRef);
           srcChild = originalRef;
         }
         
         if (isSrcInSnapshot) {
+          // srcParent must be an INodeDirectoryWithSnapshot instance since
+          // isSrcInSnapshot is true and src node has been removed from 
+          // srcParent 
           ((INodeDirectoryWithSnapshot) srcParent).undoRename4ScrParent(
               oldSrcChild.asReference(), srcChild, srcIIP.getLatestSnapshot());
         } else {
@@ -849,12 +866,16 @@ public class FSDirectory implements Clos
     final INodeReference.WithCount withCount;
     int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
         .getDstSnapshotId() : Snapshot.INVALID_ID;
+    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();    
     if (isSrcInSnapshot) {
       final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
-          .replaceChild4ReferenceWithName(srcChild); 
+          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshot()); 
       withCount = (INodeReference.WithCount) withName.getReferredINode();
       srcChild = withName;
       srcIIP.setLastINode(srcChild);
+      // get the counts before rename
+      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true,
+          Snapshot.INVALID_ID);
     } else if (srcChildIsReference) {
       // srcChild is reference but srcChild is not in latest snapshot
       withCount = (WithCount) srcChild.asReference().getReferredINode();
@@ -902,8 +923,6 @@ public class FSDirectory implements Clos
         final INodeReference.DstReference ref = new INodeReference.DstReference(
             dstIIP.getINode(-2).asDirectory(), withCount,
             dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
-        withCount.setParentReference(ref);
-        withCount.incrementReferenceCount();
         toDst = ref;
       }
 
@@ -941,6 +960,17 @@ public class FSDirectory implements Clos
           // deleted. Need to update the SnapshotManager.
           namesystem.removeSnapshottableDirs(snapshottableDirs);
         }
+        
+        // update the quota usage in src tree
+        if (isSrcInSnapshot) {
+          // get the counts after rename
+          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+              Quota.Counts.newInstance(), false, Snapshot.INVALID_ID);
+          newSrcCounts.subtract(oldSrcCounts);
+          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+              newSrcCounts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID);
+        }
+        
         return filesDeleted >= 0;
       }
     } finally {
@@ -952,12 +982,14 @@ public class FSDirectory implements Clos
         if (withCount == null) {
           srcChild.setLocalName(srcChildName);
         } else if (!srcChildIsReference) { // src must be in snapshot
+          // the withCount node will no longer be used thus no need to update
+          // its reference number here
           final INode originalChild = withCount.getReferredINode();
           srcChild = originalChild;
         } else {
+          withCount.removeReference(oldSrcChild.asReference());
           final INodeReference originalRef = new INodeReference.DstReference(
               srcParent, withCount, srcRefDstSnapshot);
-          withCount.setParentReference(originalRef);
           srcChild = originalRef;
         }
         
@@ -978,6 +1010,12 @@ public class FSDirectory implements Clos
         } else {
           addLastINodeNoQuotaCheck(dstIIP, removedDst);
         }
+        if (removedDst.isReference()) {
+          final INodeReference removedDstRef = removedDst.asReference();
+          final INodeReference.WithCount wc = 
+              (WithCount) removedDstRef.getReferredINode().asReference();
+          wc.addReference(removedDstRef);
+        }
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -1368,7 +1406,7 @@ public class FSDirectory implements Clos
       Quota.Counts counts = targetNode.cleanSubtree(null, latestSnapshot,
           collectedBlocks, removedINodes);
       parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-          -counts.get(Quota.DISKSPACE), true);
+          -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
       removed = counts.get(Quota.NAMESPACE);
     }
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2588,7 +2626,8 @@ public class FSDirectory implements Clos
       }
       
       @Override
-      public Counts computeQuotaUsage(Counts counts, boolean useCache) {
+      public Counts computeQuotaUsage(Counts counts, boolean useCache,
+          int lastSnapshotId) {
         return null;
       }
       

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Apr 24 20:31:06 2013
@@ -749,16 +749,16 @@ public class FSImage implements Closeabl
    * throw QuotaExceededException.
    */
   static void updateCountForQuota(INodeDirectoryWithQuota root) {
-    updateCountForQuotaRecursively(root, new Quota.Counts());
+    updateCountForQuotaRecursively(root, Quota.Counts.newInstance());
   }
   
   private static void updateCountForQuotaRecursively(INodeDirectory dir,
       Quota.Counts counts) {
     final long parentNamespace = counts.get(Quota.NAMESPACE);
     final long parentDiskspace = counts.get(Quota.DISKSPACE);
-    
-    dir.computeQuotaUsage4CurrentDirectory(counts);
 
+    dir.computeQuotaUsage4CurrentDirectory(counts);
+    
     for (INode child : dir.getChildrenList(null)) {
       if (child.isDirectory()) {
         updateCountForQuotaRecursively(child.asDirectory(), counts);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Apr 24 20:31:06 2013
@@ -674,19 +674,18 @@ public class FSImageFormat {
       //reference
       
       final boolean isWithName = in.readBoolean();
-      int dstSnapshotId = Snapshot.INVALID_ID;
-      if (!isWithName) {
-        dstSnapshotId = in.readInt();
-      }
+      // lastSnapshotId for WithName node, dstSnapshotId for DstReference node
+      int snapshotId = in.readInt();
+      
       final INodeReference.WithCount withCount
           = referenceMap.loadINodeReferenceWithCount(isSnapshotINode, in, this);
 
       if (isWithName) {
-        return new INodeReference.WithName(null, withCount, localName);
+          return new INodeReference.WithName(null, withCount, localName,
+              snapshotId);
       } else {
         final INodeReference ref = new INodeReference.DstReference(null,
-            withCount, dstSnapshotId);
-        withCount.setParentReference(ref);
+            withCount, snapshotId);
         return ref;
       }
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Wed Apr 24 20:31:06 2013
@@ -270,6 +270,8 @@ public class FSImageSerialization {
       Preconditions.checkState(ref instanceof INodeReference.DstReference);
       // dst snapshot id
       out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId());
+    } else {
+      out.writeInt(((INodeReference.WithName) ref).getLastSnapshotId());
     }
     
     final INodeReference.WithCount withCount

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Apr 24 20:31:06 2013
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.Content.CountsMap.Key;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -386,11 +388,17 @@ public abstract class INode implements D
    * Check and add namespace/diskspace consumed to itself and the ancestors.
    * @throws QuotaExceededException if quote is violated.
    */
-  public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify)
-      throws QuotaExceededException {
-    final INodeDirectory parentDir = getParent();
-    if (parentDir != null) {
-      parentDir.addSpaceConsumed(nsDelta, dsDelta, verify);
+  public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify,
+      int snapshotId) throws QuotaExceededException {
+    if (parent != null) {
+      parent.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+    }
+  }
+
+  public void addSpaceConsumedToRenameSrc(long nsDelta, long dsDelta,
+      boolean verify, int snapshotId) throws QuotaExceededException {
+    if (parent != null) {
+      parent.addSpaceConsumedToRenameSrc(nsDelta, dsDelta, verify, snapshotId);
     }
   }
 
@@ -420,11 +428,39 @@ public abstract class INode implements D
   /**
    * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
    * 
+   * With the existence of {@link INodeReference}, the same inode and its
+   * subtree may be referred by multiple {@link WithName} nodes and a
+   * {@link DstReference} node. To avoid circles while quota usage computation,
+   * we have the following rules:
+   * 
+   * <pre>
+   * 1. For a {@link DstReference} node, since the node must be in the current
+   * tree (or has been deleted as the end point of a series of rename 
+   * operations), we compute the quota usage of the referred node (and its 
+   * subtree) in the regular manner, i.e., including every inode in the current
+   * tree and in snapshot copies, as well as the size of diff list.
+   * 
+   * 2. For a {@link WithName} node, since the node must be in a snapshot, we 
+   * only count the quota usage for those nodes that still existed at the 
+   * creation time of the snapshot associated with the {@link WithName} node.
+   * We do not count in the size of the diff list.  
+   * <pre>
+   * 
    * @param counts The subtree counts for returning.
+   * @param useCache Whether to use cached quota usage. Note that 
+   *                 {@link WithName} node never uses cache for its subtree.
+   * @param lastSnapshotId {@link Snapshot#INVALID_ID} indicates the computation
+   *                       is in the current tree. Otherwise the id indicates
+   *                       the computation range for a {@link WithName} node.
    * @return The same objects as the counts parameter.
    */
   public abstract Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache);
+      boolean useCache, int lastSnapshotId);
+
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache) {
+    return computeQuotaUsage(counts, useCache, Snapshot.INVALID_ID);
+  }
   
   /**
    * @return null if the local name is null; otherwise, return the local name.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Apr 24 20:31:06 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.permission.P
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.Content.CountsMap.Key;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
@@ -204,17 +205,28 @@ public class INodeDirectory extends INod
     Preconditions.checkState(i >= 0);
     Preconditions.checkState(oldChild == children.get(i));
     
-    // TODO: the first case may never be hit
     if (oldChild.isReference() && !newChild.isReference()) {
+      // replace the referred inode, e.g., 
+      // INodeFileWithSnapshot -> INodeFileUnderConstructionWithSnapshot
+      // TODO: add a unit test for rename + append
       final INode withCount = oldChild.asReference().getReferredINode();
       withCount.asReference().setReferredINode(newChild);
     } else {
+      if (oldChild.isReference()) {
+        // both are reference nodes, e.g., DstReference -> WithName
+        final INodeReference.WithCount withCount = 
+            (WithCount) oldChild.asReference().getReferredINode();
+        withCount.removeReference(oldChild.asReference());
+      }
+      // do the replacement
       final INode removed = children.set(i, newChild);
       Preconditions.checkState(removed == oldChild);
     }
   }
 
-  INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild) {
+  INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild,
+      Snapshot latest) {
+    Preconditions.checkArgument(latest != null);
     if (oldChild instanceof INodeReference.WithName) {
       return (INodeReference.WithName)oldChild;
     }
@@ -227,7 +239,7 @@ public class INodeDirectory extends INod
       withCount = new INodeReference.WithCount(null, oldChild);
     }
     final INodeReference.WithName ref = new INodeReference.WithName(this,
-        withCount, oldChild.getLocalNameBytes());
+        withCount, oldChild.getLocalNameBytes(), latest.getId());
     replaceChild(oldChild, ref);
     return ref;
   }
@@ -415,20 +427,20 @@ public class INodeDirectory extends INod
   }
 
   @Override
-  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache) {
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
     if (children != null) {
       for (INode child : children) {
-        child.computeQuotaUsage(counts, useCache);
+        child.computeQuotaUsage(counts, useCache, lastSnapshotId);
       }
     }
-
-    return computeQuotaUsage4CurrentDirectory(counts);    
+    return computeQuotaUsage4CurrentDirectory(counts);
   }
-
+  
   /** Add quota usage for this inode excluding children. */
   public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
     counts.add(Quota.NAMESPACE, 1);
-    return counts;    
+    return counts;
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Wed Apr 24 20:31:06 2013
@@ -94,14 +94,14 @@ public class INodeDirectoryWithQuota ext
   }
   
   @Override
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache) {
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
     if (useCache && isQuotaSet()) {
       // use cache value
       counts.add(Quota.NAMESPACE, namespace);
       counts.add(Quota.DISKSPACE, diskspace);
     } else {
-      super.computeQuotaUsage(counts, false);
+      super.computeQuotaUsage(counts, false, lastSnapshotId);
     }
     return counts;
   }
@@ -141,7 +141,7 @@ public class INodeDirectoryWithQuota ext
   
   @Override
   public final void addSpaceConsumed(final long nsDelta, final long dsDelta,
-      boolean verify) throws QuotaExceededException {
+      boolean verify, int snapshotId) throws QuotaExceededException {
     if (isQuotaSet()) { 
       // The following steps are important: 
       // check quotas in this inode and all ancestors before changing counts
@@ -152,11 +152,11 @@ public class INodeDirectoryWithQuota ext
         verifyQuota(nsDelta, dsDelta);
       }
       // (2) verify quota and then add count in ancestors 
-      super.addSpaceConsumed(nsDelta, dsDelta, verify);
+      super.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
       // (3) add count in this inode
       addSpaceConsumed2Cache(nsDelta, dsDelta);
     } else {
-      super.addSpaceConsumed(nsDelta, dsDelta, verify);
+      super.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
     }
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Apr 24 20:31:06 2013
@@ -329,13 +329,32 @@ public class INodeFile extends INodeWith
 
   @Override
   public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean useCache) {
-    counts.add(Quota.NAMESPACE, this instanceof FileWithSnapshot?
-        ((FileWithSnapshot)this).getDiffs().asList().size() + 1: 1);
-    counts.add(Quota.DISKSPACE, diskspaceConsumed());
+      boolean useCache, int lastSnapshotId) {
+    long nsDelta = 1;
+    final long dsDelta;
+    if (this instanceof FileWithSnapshot) {
+      FileDiffList fileDiffList = ((FileWithSnapshot) this).getDiffs();
+      Snapshot last = fileDiffList.getLastSnapshot();
+      List<FileDiff> diffs = fileDiffList.asList();
+
+      if (lastSnapshotId == Snapshot.INVALID_ID || last == null) {
+        nsDelta += diffs.size();
+        dsDelta = diskspaceConsumed();
+      } else if (last.getId() < lastSnapshotId) {
+        dsDelta = computeFileSize(true, false) * getFileReplication();
+      } else {
+        Snapshot s = fileDiffList.searchSnapshotById(lastSnapshotId);
+        dsDelta = diskspaceConsumed(s);      
+      }
+    } else {
+      dsDelta = diskspaceConsumed();
+    }
+    counts.add(Quota.NAMESPACE, nsDelta);
+    counts.add(Quota.DISKSPACE, dsDelta);
     return counts;
   }
 
+
   @Override
   public final Content.CountsMap computeContentSummary(
       final Content.CountsMap countsMap) {
@@ -448,6 +467,14 @@ public class INodeFile extends INodeWith
     // use preferred block size for the last block if it is under construction
     return computeFileSize(true, true) * getBlockReplication();
   }
+
+  public final long diskspaceConsumed(Snapshot lastSnapshot) {
+    if (lastSnapshot != null) {
+      return computeFileSize(lastSnapshot) * getFileReplication(lastSnapshot);
+    } else {
+      return diskspaceConsumed();
+    }
+  }
   
   /**
    * Return the penultimate allocated block for this file.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Wed Apr 24 20:31:06 2013
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -76,11 +80,10 @@ public abstract class INodeReference ext
     if (!(referred instanceof WithCount)) {
       return -1;
     }
+    
     WithCount wc = (WithCount) referred;
-    if (ref == wc.getParentReference()) {
-      wc.setParent(null);
-    }
-    return ((WithCount)referred).decrementReferenceCount();
+    wc.removeReference(ref);
+    return wc.getReferenceCount();
   }
 
   private INode referred;
@@ -222,7 +225,7 @@ public abstract class INodeReference ext
   }
 
   @Override
-  public final Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+  public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       throws QuotaExceededException {
     return referred.cleanSubtree(snapshot, prior, collectedBlocks,
@@ -248,8 +251,9 @@ public abstract class INodeReference ext
   }
 
   @Override
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache) {
-    return referred.computeQuotaUsage(counts, useCache);
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
+    return referred.computeQuotaUsage(counts, useCache, lastSnapshotId);
   }
   
   @Override
@@ -258,12 +262,6 @@ public abstract class INodeReference ext
   }
 
   @Override
-  public final void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify)
-      throws QuotaExceededException {
-    referred.addSpaceConsumed(nsDelta, dsDelta, verify);
-  }
-
-  @Override
   public final long getNsQuota() {
     return referred.getNsQuota();
   }
@@ -302,10 +300,11 @@ public abstract class INodeReference ext
   public int getDstSnapshotId() {
     return Snapshot.INVALID_ID;
   }
-
+  
   /** An anonymous reference with reference count. */
   public static class WithCount extends INodeReference {
-    private int referenceCount = 1;
+    
+    private final List<WithName> withNameList = new ArrayList<WithName>();
     
     public WithCount(INodeReference parent, INode referred) {
       super(parent, referred);
@@ -313,30 +312,88 @@ public abstract class INodeReference ext
       referred.setParentReference(this);
     }
     
-    /** @return the reference count. */
     public int getReferenceCount() {
-      return referenceCount;
+      int count = withNameList.size();
+      if (getParentReference() != null) {
+        count++;
+      }
+      return count;
     }
 
     /** Increment and then return the reference count. */
-    public int incrementReferenceCount() {
-      return ++referenceCount;
+    public void addReference(INodeReference ref) {
+      if (ref instanceof WithName) {
+        withNameList.add((WithName) ref);
+      } else if (ref instanceof DstReference) {
+        setParentReference(ref);
+      }
     }
 
     /** Decrement and then return the reference count. */
-    public int decrementReferenceCount() {
-      return --referenceCount;
+    public void removeReference(INodeReference ref) {
+      if (ref instanceof WithName) {
+        Iterator<INodeReference.WithName> iter = withNameList.iterator();
+        while (iter.hasNext()) {
+          if (iter.next() == ref) {
+            iter.remove();
+            break;
+          }
+        }
+      } else if (ref == getParentReference()) {
+        setParent(null);
+      }
+    }
+    
+    @Override
+    public final void addSpaceConsumed(long nsDelta, long dsDelta,
+        boolean verify, int snapshotId) throws QuotaExceededException {
+      INodeReference parentRef = getParentReference();
+      if (parentRef != null) {
+        parentRef.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+      }
+      addSpaceConsumedToRenameSrc(nsDelta, dsDelta, verify, snapshotId);
+    }
+    
+    @Override
+    public final void addSpaceConsumedToRenameSrc(long nsDelta, long dsDelta,
+        boolean verify, int snapshotId) throws QuotaExceededException {
+      if (snapshotId != Snapshot.INVALID_ID) {
+        // sort withNameList based on the lastSnapshotId
+        Collections.sort(withNameList, new Comparator<WithName>() {
+          @Override
+          public int compare(WithName w1, WithName w2) {
+            return w1.lastSnapshotId - w2.lastSnapshotId;
+          }
+        });
+        for (INodeReference.WithName withName : withNameList) {
+          if (withName.getLastSnapshotId() >= snapshotId) {
+            withName.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+            break;
+          }
+        }
+      }
     }
   }
-
+  
   /** A reference with a fixed name. */
   public static class WithName extends INodeReference {
 
     private final byte[] name;
 
-    public WithName(INodeDirectory parent, WithCount referred, byte[] name) {
+    /**
+     * The id of the last snapshot in the src tree when this WithName node was 
+     * generated. When calculating the quota usage of the referred node, only 
+     * the files/dirs existing when this snapshot was taken will be counted for 
+     * this WithName node and propagated along its ancestor path.
+     */
+    private final int lastSnapshotId;
+    
+    public WithName(INodeDirectory parent, WithCount referred, byte[] name,
+        int lastSnapshotId) {
       super(parent, referred);
       this.name = name;
+      this.lastSnapshotId = lastSnapshotId;
+      referred.addReference(this);
     }
 
     @Override
@@ -349,6 +406,36 @@ public abstract class INodeReference ext
       throw new UnsupportedOperationException("Cannot set name: " + getClass()
           + " is immutable.");
     }
+    
+    public int getLastSnapshotId() {
+      return lastSnapshotId;
+    }
+    
+    @Override
+    public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+        boolean useCache, int lastSnapshotId) {
+      Preconditions.checkState(lastSnapshotId == Snapshot.INVALID_ID
+          || this.lastSnapshotId <= lastSnapshotId);
+      final INode referred = this.getReferredINode().asReference()
+          .getReferredINode();
+      // we cannot use cache for the referred node since its cached quota may
+      // have already been updated by changes in the current tree
+      return referred.computeQuotaUsage(counts, false, this.lastSnapshotId);
+    }
+    
+    @Override
+    public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+        BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes)
+        throws QuotaExceededException {
+      Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
+          collectedBlocks, removedINodes);
+      INodeReference ref = getReferredINode().getParentReference();
+      if (ref != null) {
+        ref.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+            -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
+      }
+      return counts;
+    }
   }
   
   public static class DstReference extends INodeReference {
@@ -373,6 +460,22 @@ public abstract class INodeReference ext
         final int dstSnapshotId) {
       super(parent, referred);
       this.dstSnapshotId = dstSnapshotId;
+      referred.addReference(this);
+    }
+    
+    @Override
+    public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+        BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes)
+        throws QuotaExceededException {
+      Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
+          collectedBlocks, removedINodes);
+      if (snapshot != null) {
+        // also need to update quota usage along the corresponding WithName node
+        WithCount wc = (WithCount) getReferredINode();
+        wc.addSpaceConsumedToRenameSrc(-counts.get(Quota.NAMESPACE),
+            -counts.get(Quota.DISKSPACE), true, snapshot.getId());
+      }
+      return counts;
     }
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Wed Apr 24 20:31:06 2013
@@ -89,7 +89,7 @@ public class INodeSymlink extends INodeW
 
   @Override
   public Quota.Counts computeQuotaUsage(Quota.Counts counts,
-      boolean updateCache) {
+      boolean updateCache, int lastSnapshotId) {
     counts.add(Quota.NAMESPACE, 1);
     return counts;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java Wed Apr 24 20:31:06 2013
@@ -68,7 +68,8 @@ abstract class AbstractINodeDiffList<N e
    */
   final Quota.Counts deleteSnapshotDiff(final Snapshot snapshot,
       Snapshot prior, final N currentINode,
-      final BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+      final BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
+      throws QuotaExceededException {
     int snapshotIndex = Collections.binarySearch(diffs, snapshot.getId());
     
     Quota.Counts counts = Quota.Counts.newInstance();
@@ -80,6 +81,13 @@ abstract class AbstractINodeDiffList<N e
       } else {
         removed = diffs.remove(0);
         counts.add(Quota.NAMESPACE, 1);
+        // We add 1 to the namespace quota usage since we delete a diff. 
+        // The quota change will be propagated to 
+        // 1) ancestors in the current tree, and 
+        // 2) src tree of any renamed ancestor.
+        // Because for 2) we do not calculate the number of diff for quota 
+        // usage, we need to compensate this diff change for 2)
+        currentINode.addSpaceConsumedToRenameSrc(1, 0, false, snapshot.getId());
         counts.add(removed.destroyDiffAndCollectBlocks(currentINode,
             collectedBlocks, removedINodes));
       }
@@ -91,6 +99,7 @@ abstract class AbstractINodeDiffList<N e
         // combine the to-be-removed diff with its previous diff
         removed = diffs.remove(snapshotIndex);
         counts.add(Quota.NAMESPACE, 1);
+        currentINode.addSpaceConsumedToRenameSrc(1, 0, false, snapshot.getId());
         if (previous.snapshotINode == null) {
           previous.snapshotINode = removed.snapshotINode;
         } else if (removed.snapshotINode != null) {
@@ -108,7 +117,7 @@ abstract class AbstractINodeDiffList<N e
   /** Add an {@link AbstractINodeDiff} for the given snapshot. */
   final D addDiff(Snapshot latest, N currentINode)
       throws QuotaExceededException {
-    currentINode.addSpaceConsumed(1, 0, true);
+    currentINode.addSpaceConsumed(1, 0, true, Snapshot.INVALID_ID);
     return addLast(createDiff(latest, currentINode));
   }
 
@@ -142,6 +151,20 @@ abstract class AbstractINodeDiffList<N e
   }
   
   /**
+   * Search for the snapshot whose id is 1) no larger than the given id, and 2)
+   * most close to the given id
+   */
+  public final Snapshot searchSnapshotById(final int snapshotId) {
+    final int i = Collections.binarySearch(diffs, snapshotId);
+    if (i == -1) {
+      return null;
+    } else {
+      int index = i < 0 ? -i - 2 : i;
+      return diffs.get(index).getSnapshot();
+    }
+  }
+  
+  /**
    * Find the latest snapshot before a given snapshot.
    * @param anchor The returned snapshot must be taken before this given 
    *               snapshot.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Wed Apr 24 20:31:06 2013
@@ -326,8 +326,10 @@ public class INodeDirectorySnapshottable
             removedINodes);
         INodeDirectory parent = getParent();
         if (parent != null) {
+          // there will not be any WithName node corresponding to the deleted 
+          // snapshot, thus only update the quota usage in the current tree
           parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-              -counts.get(Quota.DISKSPACE), true);
+              -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
         }
       } catch(QuotaExceededException e) {
         LOG.error("BUG: removeSnapshot increases namespace usage.", e);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Wed Apr 24 20:31:06 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.S
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.Content;
 import org.apache.hadoop.hdfs.server.namenode.Content.CountsMap.Key;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -115,11 +116,14 @@ public class INodeDirectoryWithSnapshot 
       Quota.Counts counts = Quota.Counts.newInstance();
       final List<INode> deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
-        if (INodeReference.tryRemoveReference(d) <= 0) {
-          d.computeQuotaUsage(counts, false);
-          d.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-        } else {
-          refNodes.add(d.asReference());
+        d.computeQuotaUsage(counts, false);
+        d.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        if (d.isReference()) {
+          INodeReference.WithCount wc = 
+              (INodeReference.WithCount) d.asReference().getReferredINode();
+          if (wc.getReferenceCount() > 0) {
+            refNodes.add(d.asReference());
+          }
         }
       }
       deletedList.clear();
@@ -271,25 +275,36 @@ public class INodeDirectoryWithSnapshot 
         @Override
         public void process(INode inode) {
           if (inode != null) {
-            if (INodeReference.tryRemoveReference(inode) <= 0) {
-              inode.computeQuotaUsage(counts, false);
-              inode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-            } else {
-              // if the node is a reference node, we should continue the 
-              // snapshot deletion process
-              try {
-                // use null as prior here because we are handling a reference
-                // node stored in the created list of a snapshot diff. This 
-                // snapshot diff must be associated with the latest snapshot of
-                // the dst tree before the rename operation. In this scenario,
-                // the prior snapshot should be the one created in the src tree,
-                // and it can be identified by the cleanSubtree since we call
-                // recordModification before the rename.
-                counts.add(inode.cleanSubtree(posterior.snapshot, null,
-                    collectedBlocks, removedINodes));
-              } catch (QuotaExceededException e) {
-                String error = "should not have QuotaExceededException while deleting snapshot";
-                LOG.error(error, e);
+            inode.computeQuotaUsage(counts, false);
+            inode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+
+            boolean handleRef = false;
+            if (inode.isReference()) {
+              INodeReference.WithCount wc = (INodeReference.WithCount) inode
+                  .asReference().getReferredINode();
+              if (wc.getReferenceCount() > 0) {
+                handleRef = true;
+              }
+            }
+
+            if (handleRef) {
+              final Snapshot postSnapshot = posterior.snapshot;
+              if (inode instanceof INodeReference.DstReference) {
+                // we are handling a reference node and its subtree stored in
+                // the created list of a snapshot diff, which must be associated
+                // with the latest snapshot of the dst tree before the rename 
+                // operation.
+                destroyDstSnapshot(inode, postSnapshot, null, collectedBlocks,
+                    removedINodes);
+              } else if (inode instanceof INodeReference.WithName) {
+                // the inode should be renamed again. We only need to delete
+                // postSnapshot in its subtree.
+                try {
+                  inode.cleanSubtree(postSnapshot, null, collectedBlocks,
+                      removedINodes);
+                } catch (QuotaExceededException e) {
+                  LOG.error("Error: should not throw QuotaExceededException", e);
+                }
               }
             }
           }
@@ -297,6 +312,74 @@ public class INodeDirectoryWithSnapshot 
       });
       return counts;
     }
+    
+    /**
+     * For a reference node, delete its first snapshot associated with the dst
+     * tree of a rename operation, i.e., the snapshot diff is associated with
+     * the latest snapshot of the dst tree before the rename operation. The
+     * difference between this process and a regular snapshot deletion
+     * process is that we need to delete everything created after the rename,
+     * i.e., we should destroy the whole created list of the referred node.
+     */
+    private Quota.Counts destroyDstSnapshot(INode inode, final Snapshot snapshot, 
+        Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes) {
+      Quota.Counts counts = Quota.Counts.newInstance();
+      try {
+        if (inode.isReference()) {
+          INodeReference referenceNode = inode.asReference(); 
+          INodeReference.WithCount wc = 
+              (WithCount) referenceNode.getReferredINode();
+          INode referred = wc.getReferredINode();
+          Quota.Counts subCounts = destroyDstSnapshot(referred, snapshot,
+              prior, collectedBlocks, removedINodes);
+          if (inode instanceof INodeReference.WithName) {
+            INodeReference ref = wc.getParentReference();
+            if (ref != null) {
+              ref.addSpaceConsumed(-subCounts.get(Quota.NAMESPACE),
+                  -subCounts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
+            }
+          } else if (inode instanceof INodeReference.DstReference) {
+            wc.addSpaceConsumedToRenameSrc(-counts.get(Quota.NAMESPACE),
+                -counts.get(Quota.DISKSPACE), true, snapshot.getId());
+          }
+        } else if (inode.isFile()) { // file and not reference
+          counts.add(inode.cleanSubtree(snapshot, null, collectedBlocks,
+              removedINodes));
+        } else if (inode.isDirectory()) { // directory and not reference
+          if (inode.asDirectory() instanceof INodeDirectoryWithSnapshot) {
+            INodeDirectoryWithSnapshot dirNode = 
+                (INodeDirectoryWithSnapshot) inode.asDirectory();
+            DirectoryDiffList diffList = dirNode.getDiffs();
+            prior = diffList.updatePrior(snapshot, prior);
+            counts.add(diffList.deleteSnapshotDiff(snapshot, prior, dirNode, 
+                collectedBlocks, removedINodes));
+            if (prior != null) {
+              DirectoryDiff priorDiff = diffList.getDiff(prior);
+              if (priorDiff != null) {
+                // destroy everything in the created list!
+                counts.add(priorDiff.diff.destroyCreatedList(dirNode,
+                    collectedBlocks, removedINodes));
+                for (INode dNode : priorDiff.getChildrenDiff().getList(
+                    ListType.DELETED)) {
+                  counts.add(cleanDeletedINode(dNode, snapshot, prior,
+                      collectedBlocks, removedINodes));
+                }
+              }
+            }
+          }
+          Snapshot s = snapshot != null && prior != null ? prior : snapshot;
+          for (INode child : inode.asDirectory().getChildrenList(s)) {
+            counts.add(destroyDstSnapshot(child, s, prior, collectedBlocks,
+                removedINodes));
+          }
+        }
+      } catch (QuotaExceededException e) {
+        String error = "should not have QuotaExceededException while deleting snapshot";
+        LOG.error(error, e);
+      }
+      return counts;
+    }
 
     /**
      * @return The children list of a directory in a snapshot.
@@ -395,22 +478,19 @@ public class INodeDirectoryWithSnapshot 
       for (INodeReference ref : refNodes) {
         // if the node is a reference node, we should continue the 
         // snapshot deletion process
-        try {
-          // Use null as prior snapshot. We are handling a reference node stored
-          // in the delete list of this snapshot diff. We need to destroy this 
-          // snapshot diff because it is the very first one in history.
-          // If the ref node is a WithName instance acting as the src node of
-          // the rename operation, there will not be any snapshot before the
-          // snapshot to be deleted. If the ref node presents the dst node of a 
-          // rename operation, we can identify the corresponding prior snapshot 
-          // when we come into the subtree of the ref node.
-          counts.add(ref.cleanSubtree(this.snapshot, null, collectedBlocks,
-              removedINodes));
-        } catch (QuotaExceededException e) {
-          String error = 
-              "should not have QuotaExceededException while deleting snapshot " 
-              + this.snapshot;
-          LOG.error(error, e);
+        if (ref instanceof INodeReference.DstReference) {
+          // if ref is a DstReference instance, we should delete all the files
+          // created after the rename
+          destroyDstSnapshot(ref, snapshot, null, collectedBlocks,
+              removedINodes);
+        } else if (ref instanceof INodeReference.WithName) {
+          // ref should have been renamed again. We only need to delete
+          // the snapshot in its subtree.
+          try {
+            ref.cleanSubtree(snapshot, null, collectedBlocks, removedINodes);
+          } catch (QuotaExceededException e) {
+            LOG.error("Error: should not throw QuotaExceededException", e);
+          }
         }
       }
       return counts;
@@ -689,7 +769,7 @@ public class INodeDirectoryWithSnapshot 
     if (added && !removeDeletedChild) {
       final Quota.Counts counts = deletedChild.computeQuotaUsage();
       addSpaceConsumed(counts.get(Quota.NAMESPACE),
-          counts.get(Quota.DISKSPACE), false);
+          counts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID);
     }
   }
 
@@ -796,16 +876,16 @@ public class INodeDirectoryWithSnapshot 
    * @param collectedBlocks Used to collect blocks for later deletion.
    * @return Quota usage update.
    */
-  private Quota.Counts cleanDeletedINode(INode inode, Snapshot post,
+  private static Quota.Counts cleanDeletedINode(INode inode, Snapshot post,
       Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
-      final List<INode> removedINodes) {
+      final List<INode> removedINodes) throws QuotaExceededException {
     Quota.Counts counts = Quota.Counts.newInstance();
     Deque<INode> queue = new ArrayDeque<INode>();
     queue.addLast(inode);
     while (!queue.isEmpty()) {
       INode topNode = queue.pollFirst();
-      if (topNode instanceof FileWithSnapshot) {
-        FileWithSnapshot fs = (FileWithSnapshot) topNode;
+      if (topNode.isFile() && topNode.asFile() instanceof FileWithSnapshot) {
+        FileWithSnapshot fs = (FileWithSnapshot) topNode.asFile();
         counts.add(fs.getDiffs().deleteSnapshotDiff(post, prior,
             topNode.asFile(), collectedBlocks, removedINodes));
       } else if (topNode.isDirectory()) {
@@ -841,12 +921,40 @@ public class INodeDirectoryWithSnapshot 
   }
 
   @Override
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache, int lastSnapshotId) {
+    if ((useCache && isQuotaSet()) || lastSnapshotId == Snapshot.INVALID_ID) {
+      return super.computeQuotaUsage(counts, useCache, lastSnapshotId);
+    }
+    
+    final int diffNum = 0;
+    Snapshot lastSnapshot = null;
+    Snapshot lastInDiff = diffs.getLastSnapshot();
+    // if lastSnapshotId > lastInDiff.getId(), the snapshot diff associated with
+    // lastSnapshotId must have been deleted. We should call 
+    // getChildrenList(null) to get the children list for the continuous 
+    // computation. In the meanwhile, there must be some snapshot diff whose
+    // snapshot id is no less than lastSnapshotId. Otherwise the WithName node
+    // itself should have been deleted.
+    if (lastInDiff != null && lastInDiff.getId() >= lastSnapshotId) {
+      lastSnapshot = diffs.searchSnapshotById(lastSnapshotId);
+    }
+    
+    ReadOnlyList<INode> childrenList = getChildrenList(lastSnapshot);
+    for (INode child : childrenList) {
+      child.computeQuotaUsage(counts, useCache, lastSnapshotId);
+    }
+    
+    counts.add(Quota.NAMESPACE, diffNum + 1);
+    return counts;
+  }
+  
+  @Override
   public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
     super.computeQuotaUsage4CurrentDirectory(counts);
-
     for(DirectoryDiff d : diffs) {
       for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
-        deleted.computeQuotaUsage(counts, false);
+        deleted.computeQuotaUsage(counts, false, Snapshot.INVALID_ID);
       }
     }
     counts.add(Quota.NAMESPACE, diffs.asList().size());

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Wed Apr 24 20:31:06 2013
@@ -125,6 +125,13 @@ public class Snapshot implements Compara
     }
     return latest;
   }
+  
+  static Snapshot read(DataInput in, FSImageFormat.Loader loader)
+      throws IOException {
+    final int snapshotId = in.readInt();
+    final INode root = loader.loadINodeWithLocalName(false, in);
+    return new Snapshot(snapshotId, root.asDirectory(), null);
+  }
 
   /** The root directory of the snapshot. */
   static public class Root extends INodeDirectory {
@@ -205,10 +212,4 @@ public class Snapshot implements Compara
     // write root
     FSImageSerialization.writeINodeDirectory(root, out);
   }
-  
-  static Snapshot read(DataInput in, FSImageFormat.Loader loader) throws IOException {
-    final int snapshotId = in.readInt();
-    final INode root = loader.loadINodeWithLocalName(false, in);
-    return new Snapshot(snapshotId, root.asDirectory(), null);
-  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java Wed Apr 24 20:31:06 2013
@@ -354,7 +354,6 @@ public class SnapshotFSImageFormat {
       } else {
         final long id = in.readLong();
         withCount = referenceMap.get(id);
-        withCount.incrementReferenceCount();
       }
       return withCount;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java?rev=1471647&r1=1471646&r2=1471647&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java Wed Apr 24 20:31:06 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyBoolean;
@@ -61,6 +62,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /** Testing rename with snapshots. */
 public class TestRenameWithSnapshots {
@@ -418,14 +420,16 @@ public class TestRenameWithSnapshots {
     final Path newfoo = new Path(sdir1, "foo");
     hdfs.rename(foo, newfoo);
     
-    final Path bar3 = new Path(newfoo, "bar3");
-    DFSTestUtil.createFile(hdfs, bar3, BLOCKSIZE, REPL, SEED);
+    final Path newbar = new Path(newfoo, bar.getName());
+    final Path newbar2 = new Path(newfoo, bar2.getName());
+    final Path newbar3 = new Path(newfoo, "bar3");
+    DFSTestUtil.createFile(hdfs, newbar3, BLOCKSIZE, REPL, SEED);
     
     hdfs.createSnapshot(sdir1, "s4");
-    hdfs.delete(bar, true);
-    hdfs.delete(bar3, true);
+    hdfs.delete(newbar, true);
+    hdfs.delete(newbar3, true);
     
-    assertFalse(hdfs.exists(bar3));
+    assertFalse(hdfs.exists(newbar3));
     assertFalse(hdfs.exists(bar));
     final Path bar_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
         "foo/bar");
@@ -435,7 +439,7 @@ public class TestRenameWithSnapshots {
     assertTrue(hdfs.exists(bar3_s4));
     
     hdfs.createSnapshot(sdir1, "s5");
-    hdfs.delete(bar2, true);
+    hdfs.delete(newbar2, true);
     assertFalse(hdfs.exists(bar2));
     final Path bar2_s5 = SnapshotTestHelper.getSnapshotPath(sdir1, "s5",
         "foo/bar2");
@@ -527,8 +531,8 @@ public class TestRenameWithSnapshots {
     // dump the namespace loaded from fsimage
     SnapshotTestHelper.dumpTree2File(fsdir, fsnAfter);
     
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle, false);
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, false);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle, true);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, true);
   }
   
   /**
@@ -832,6 +836,9 @@ public class TestRenameWithSnapshots {
     hdfs.setReplication(bar1_dir2, REPL_1);
     hdfs.setReplication(bar_dir2, REPL_1);
     
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
     // create snapshots
     SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s11");
     SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s22");
@@ -848,6 +855,9 @@ public class TestRenameWithSnapshots {
     hdfs.setReplication(bar1_dir3, REPL_2);
     hdfs.setReplication(bar_dir3, REPL_2);
     
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
     // create snapshots
     SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s111");
     SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s222");
@@ -894,11 +904,14 @@ public class TestRenameWithSnapshots {
     // 3. /dir3/foo -> /dir2/foo
     hdfs.rename(foo_dir3, foo_dir2);
     hdfs.rename(bar_dir3, bar_dir2);
-    
-    // modification on /dir2/foo
+   
+//    // modification on /dir2/foo
     hdfs.setReplication(bar1_dir2, REPL);
     hdfs.setReplication(bar_dir2, REPL);
     
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
     // create snapshots
     SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1111");
     SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2222");
@@ -1386,4 +1399,61 @@ public class TestRenameWithSnapshots {
     assertEquals("s1", fooDiffs.get(0).snapshot.getRoot().getLocalName());
     assertEquals("s3", fooDiffs.get(1).snapshot.getRoot().getLocalName());
   }
+  
+  /**
+   * Test undo where dst node being overwritten is a reference node
+   */
+  @Test
+  public void testRenameUndo_4() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    final Path sdir3 = new Path("/dir3");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    hdfs.mkdirs(sdir3);
+    
+    final Path foo = new Path(sdir1, "foo");
+    final Path bar = new Path(foo, "bar");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    
+    final Path foo2 = new Path(sdir2, "foo");
+    hdfs.mkdirs(foo2);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    
+    // rename foo2 to foo3, so that foo3 will be a reference node
+    final Path foo3 = new Path(sdir3, "foo");
+    hdfs.rename(foo2, foo3);
+    
+    INode foo3Node = fsdir.getINode4Write(foo3.toString());
+    assertTrue(foo3Node.isReference());
+    
+    INodeDirectory dir3 = fsdir.getINode4Write(sdir3.toString()).asDirectory();
+    INodeDirectory mockDir3 = spy(dir3);
+    // fail the rename but succeed in undo
+    doReturn(false).when(mockDir3).addChild((INode) Mockito.isNull(),
+        anyBoolean(), (Snapshot) anyObject());
+    Mockito.when(mockDir3.addChild((INode) Mockito.isNotNull(), anyBoolean(),
+        (Snapshot) anyObject())).thenReturn(false).thenCallRealMethod();
+    INodeDirectory root = fsdir.getINode4Write("/").asDirectory();
+    root.replaceChild(dir3, mockDir3);
+    foo3Node.setParent(mockDir3);
+    
+    try {
+      hdfs.rename(foo, foo3, Rename.OVERWRITE);
+      fail("the rename from " + foo + " to " + foo3 + " should fail");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("rename from " + foo + " to "
+          + foo3 + " failed.", e);
+    }
+    
+    // make sure the undo is correct
+    final INode foo3Node_undo = fsdir.getINode4Write(foo3.toString());
+    assertSame(foo3Node, foo3Node_undo);
+    INodeReference.WithCount foo3_wc = (WithCount) foo3Node.asReference()
+        .getReferredINode();
+    assertEquals(2, foo3_wc.getReferenceCount());
+    assertSame(foo3Node, foo3_wc.getParentReference());
+  }
 }