You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/11/19 08:39:46 UTC

svn commit: r1543332 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ src/test/java/org/apache/hadoop/hdfs/ser...

Author: jing9
Date: Tue Nov 19 07:39:46 2013
New Revision: 1543332

URL: http://svn.apache.org/r1543332
Log:
HDFS-5428. Merge change r1543329 from trunk.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1543332&r1=1543331&r2=1543332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 19 07:39:46 2013
@@ -192,6 +192,9 @@ Release 2.3.0 - UNRELEASED
 
     HDFS-5502. Fix HTTPS support in HsftpFileSystem. (Haohui Mai via jing9)
 
+    HDFS-5428. Under construction files deletion after snapshot+checkpoint+nn restart 
+    leads nn safemode. (jing9)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1543332&r1=1543331&r2=1543332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Nov 19 07:39:46 2013
@@ -32,6 +32,7 @@ import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -48,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
@@ -676,6 +678,12 @@ public class FSImageFormat {
           if (underConstruction) {
             clientName = FSImageSerialization.readString(in);
             clientMachine = FSImageSerialization.readString(in);
+            // convert the last block to BlockUC
+            if (blocks != null && blocks.length > 0) {
+              BlockInfo lastBlk = blocks[blocks.length - 1]; 
+              blocks[blocks.length - 1] = new BlockInfoUnderConstruction(
+                  lastBlk, replication);
+            }
           }
         }
       }
@@ -688,10 +696,15 @@ public class FSImageFormat {
       }
       final INodeFile file = new INodeFile(inodeId, localName, permissions,
           modificationTime, atime, blocks, replication, blockSize);
-      return fileDiffs != null? new INodeFileWithSnapshot(file, fileDiffs)
-          : underConstruction? new INodeFileUnderConstruction(
-              file, clientName, clientMachine, null)
-          : file;
+      if (underConstruction) {
+        INodeFileUnderConstruction fileUC = new INodeFileUnderConstruction(
+            file, clientName, clientMachine, null);
+        return fileDiffs == null ? fileUC :
+          new INodeFileUnderConstructionWithSnapshot(fileUC, fileDiffs);
+      } else {
+        return fileDiffs == null ? file : 
+          new INodeFileWithSnapshot(file, fileDiffs);
+      }
     } else if (numBlocks == -1) {
       //directory
       
@@ -819,8 +832,20 @@ public class FSImageFormat {
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
-        final INodesInPath iip = fsDir.getLastINodeInPath(path);
-        INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path);
+        INodeFile oldnode = null;
+        boolean inSnapshot = false;
+        if (path != null && FSDirectory.isReservedName(path) && 
+            LayoutVersion.supports(Feature.ADD_INODE_ID, getLayoutVersion())) {
+          // TODO: for HDFS-5428, we use reserved path for those INodeFileUC in 
+          // snapshot. If we support INode ID in the layout version, we can use
+          // the inode id to find the oldnode.
+          oldnode = namesystem.dir.getInode(cons.getId()).asFile();
+          inSnapshot = true;
+        } else {
+          final INodesInPath iip = fsDir.getLastINodeInPath(path);
+          oldnode = INodeFile.valueOf(iip.getINode(0), path);
+        }
+        
         cons.setLocalName(oldnode.getLocalNameBytes());
         INodeReference parentRef = oldnode.getParentReference();
         if (parentRef != null) {
@@ -831,11 +856,23 @@ public class FSImageFormat {
 
         if (oldnode instanceof INodeFileWithSnapshot) {
           cons = new INodeFileUnderConstructionWithSnapshot(cons,
-              ((INodeFileWithSnapshot)oldnode).getDiffs());
+              ((INodeFileWithSnapshot) oldnode).getDiffs());
         }
 
-        fsDir.replaceINodeFile(path, oldnode, cons);
-        namesystem.leaseManager.addLease(cons.getClientName(), path); 
+        if (!inSnapshot) {
+          fsDir.replaceINodeFile(path, oldnode, cons);
+          namesystem.leaseManager.addLease(cons.getClientName(), path);
+        } else {
+          if (parentRef != null) {
+            // replace oldnode with cons
+            parentRef.setReferredINode(cons);
+          } else {
+            // replace old node in its parent's children list and deleted list
+            oldnode.getParent().replaceChildFileInSnapshot(oldnode, cons);
+            namesystem.dir.addToInodeMap(cons);
+            updateBlocksMap(cons);
+          }
+        }
       }
     }
 
@@ -906,6 +943,9 @@ public class FSImageFormat {
     /** The MD5 checksum of the file that was written */
     private MD5Hash savedDigest;
     private final ReferenceMap referenceMap = new ReferenceMap();
+    
+    private final Map<Long, INodeFileUnderConstruction> snapshotUCMap = 
+        new HashMap<Long, INodeFileUnderConstruction>();
 
     /** @throws IllegalStateException if the instance has not yet saved an image */
     private void checkSaved() {
@@ -982,14 +1022,22 @@ public class FSImageFormat {
         // save the root
         saveINode2Image(fsDir.rootDir, out, false, referenceMap, counter);
         // save the rest of the nodes
-        saveImage(fsDir.rootDir, out, true, counter);
+        saveImage(fsDir.rootDir, out, true, false, counter);
         prog.endStep(Phase.SAVING_CHECKPOINT, step);
         // Now that the step is finished, set counter equal to total to adjust
         // for possible under-counting due to reference inodes.
         prog.setCount(Phase.SAVING_CHECKPOINT, step,
           fsDir.rootDir.numItemsInTree());
         // save files under construction
-        sourceNamesystem.saveFilesUnderConstruction(out);
+        // TODO: for HDFS-5428, since we cannot break the compatibility of 
+        // fsimage, we store part of the under-construction files that are only
+        // in snapshots in this "under-construction-file" section. As a 
+        // temporary solution, we use "/.reserved/.inodes/<inodeid>" as their 
+        // paths, so that when loading fsimage we do not put them into the lease
+        // map. In the future, we can remove this hack when we can bump the 
+        // layout version.
+        sourceNamesystem.saveFilesUnderConstruction(out, snapshotUCMap);
+        
         context.checkCancelled();
         sourceNamesystem.saveSecretManagerState(out, sdPath);
         context.checkCancelled();
@@ -1012,20 +1060,31 @@ public class FSImageFormat {
      * Save children INodes.
      * @param children The list of children INodes
      * @param out The DataOutputStream to write
+     * @param inSnapshot Whether the parent directory or its ancestor is in 
+     *                   the deleted list of some snapshot (caused by rename or 
+     *                   deletion)
      * @param counter Counter to increment for namenode startup progress
      * @return Number of children that are directory
      */
-    private int saveChildren(ReadOnlyList<INode> children, DataOutputStream out,
-        Counter counter) throws IOException {
+    private int saveChildren(ReadOnlyList<INode> children,
+        DataOutputStream out, boolean inSnapshot, Counter counter)
+        throws IOException {
       // Write normal children INode. 
       out.writeInt(children.size());
       int dirNum = 0;
       int i = 0;
       for(INode child : children) {
         // print all children first
+        // TODO: for HDFS-5428, we cannot change the format/content of fsimage
+        // here, thus even if the parent directory is in snapshot, we still
+        // do not handle INodeUC as those stored in deleted list
         saveINode2Image(child, out, false, referenceMap, counter);
         if (child.isDirectory()) {
           dirNum++;
+        } else if (inSnapshot && child.isFile()
+            && child.asFile().isUnderConstruction()) {
+          this.snapshotUCMap.put(child.getId(),
+              (INodeFileUnderConstruction) child.asFile());
         }
         if (i++ % 50 == 0) {
           context.checkCancelled();
@@ -1042,14 +1101,15 @@ public class FSImageFormat {
      * 
      * @param current The current node
      * @param out The DataoutputStream to write the image
-     * @param snapshot The possible snapshot associated with the current node
      * @param toSaveSubtree Whether or not to save the subtree to fsimage. For
      *                      reference node, its subtree may already have been
      *                      saved before.
+     * @param inSnapshot Whether the current directory is in snapshot
      * @param counter Counter to increment for namenode startup progress
      */
     private void saveImage(INodeDirectory current, DataOutputStream out,
-        boolean toSaveSubtree, Counter counter) throws IOException {
+        boolean toSaveSubtree, boolean inSnapshot, Counter counter)
+        throws IOException {
       // write the inode id of the directory
       out.writeLong(current.getId());
       
@@ -1078,7 +1138,7 @@ public class FSImageFormat {
       }
 
       // 3. Write children INode 
-      dirNum += saveChildren(children, out, counter);
+      dirNum += saveChildren(children, out, inSnapshot, counter);
       
       // 4. Write DirectoryDiff lists, if there is any.
       SnapshotFSImageFormat.saveDirectoryDiffList(current, out, referenceMap);
@@ -1093,14 +1153,14 @@ public class FSImageFormat {
         // make sure we only save the subtree under a reference node once
         boolean toSave = child.isReference() ? 
             referenceMap.toProcessSubtree(child.getId()) : true;
-        saveImage(child.asDirectory(), out, toSave, counter);
+        saveImage(child.asDirectory(), out, toSave, inSnapshot, counter);
       }
       if (snapshotDirs != null) {
         for (INodeDirectory subDir : snapshotDirs) {
           // make sure we only save the subtree under a reference node once
           boolean toSave = subDir.getParentReference() != null ? 
               referenceMap.toProcessSubtree(subDir.getId()) : true;
-          saveImage(subDir, out, toSave, counter);
+          saveImage(subDir, out, toSave, true, counter);
         }
       }
     }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1543332&r1=1543331&r2=1543332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 19 07:39:46 2013
@@ -5891,19 +5891,40 @@ public class FSNamesystem implements Nam
   /**
    * Serializes leases. 
    */
-  void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
+  void saveFilesUnderConstruction(DataOutputStream out,
+      Map<Long, INodeFileUnderConstruction> snapshotUCMap) throws IOException {
     // This is run by an inferior thread of saveNamespace, which holds a read
     // lock on our behalf. If we took the read lock here, we could block
     // for fairness if a writer is waiting on the lock.
     synchronized (leaseManager) {
       Map<String, INodeFileUnderConstruction> nodes =
           leaseManager.getINodesUnderConstruction();
-      out.writeInt(nodes.size()); // write the size    
+      for (Map.Entry<String, INodeFileUnderConstruction> entry
+          : nodes.entrySet()) {
+        // TODO: for HDFS-5428, because of rename operations, some
+        // under-construction files that are
+        // in the current fs directory can also be captured in the
+        // snapshotUCMap. We should remove them from the snapshotUCMap.
+        snapshotUCMap.remove(entry.getValue().getId());
+      }
+      
+      out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size    
       for (Map.Entry<String, INodeFileUnderConstruction> entry
            : nodes.entrySet()) {
         FSImageSerialization.writeINodeUnderConstruction(
             out, entry.getValue(), entry.getKey());
       }
+      for (Map.Entry<Long, INodeFileUnderConstruction> entry
+          : snapshotUCMap.entrySet()) {
+        // for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
+        // as their paths
+        StringBuilder b = new StringBuilder();
+        b.append(FSDirectory.DOT_RESERVED_PATH_PREFIX)
+            .append(Path.SEPARATOR).append(FSDirectory.DOT_INODES_STRING)
+            .append(Path.SEPARATOR).append(entry.getValue().getId());
+        FSImageSerialization.writeINodeUnderConstruction(
+            out, entry.getValue(), b.toString());
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1543332&r1=1543331&r2=1543332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Tue Nov 19 07:39:46 2013
@@ -204,7 +204,24 @@ public class INodeDirectory extends INod
     clear();
     return newDir;
   }
-
+  
+  /**
+   * Used when load fileUC from fsimage. The file to be replaced is actually 
+   * only in snapshot, thus may not be contained in the children list. 
+   * See HDFS-5428 for details.
+   */
+  public void replaceChildFileInSnapshot(INodeFile oldChild,
+      final INodeFile newChild) {
+    if (children != null) {
+      final int i = searchChildren(newChild.getLocalNameBytes());
+      if (i >= 0 && children.get(i).getId() == oldChild.getId()) {
+        // no need to consider reference node here, since we already do the 
+        // replacement in FSImageFormat.Loader#loadFilesUnderConstruction
+        children.set(i, newChild);
+      }
+    }
+  }
+  
   /** Replace the given child with a new child. */
   public void replaceChild(INode oldChild, final INode newChild,
       final INodeMap inodeMap) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1543332&r1=1543331&r2=1543332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Tue Nov 19 07:39:46 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeMap;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
@@ -75,7 +76,7 @@ public class INodeDirectoryWithSnapshot 
         final INode oldChild, final INode newChild) {
       final List<INode> list = getList(type); 
       final int i = search(list, oldChild.getLocalNameBytes());
-      if (i < 0) {
+      if (i < 0 || list.get(i).getId() != oldChild.getId()) {
         return false;
       }
 
@@ -593,6 +594,14 @@ public class INodeDirectoryWithSnapshot 
   }
   
   @Override
+  public void replaceChildFileInSnapshot(final INodeFile oldChild,
+      final INodeFile newChild) {
+    super.replaceChildFileInSnapshot(oldChild, newChild);
+    diffs.replaceChild(ListType.DELETED, oldChild, newChild);
+    diffs.replaceChild(ListType.CREATED, oldChild, newChild);
+  }
+  
+  @Override
   public void replaceChild(final INode oldChild, final INode newChild,
       final INodeMap inodeMap) {
     super.replaceChild(oldChild, newChild, inodeMap);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java?rev=1543332&r1=1543331&r2=1543332&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java Tue Nov 19 07:39:46 2013
@@ -28,6 +28,11 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.security.AccessControlException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,6 +81,47 @@ public class TestOpenFilesWithSnapshot {
     cluster.restartNameNode();
   }
 
+  @Test
+  public void testWithCheckpoint() throws Exception {
+    Path path = new Path("/test");
+    doWriteAndAbort(fs, path);
+    fs.delete(new Path("/test/test"), true);
+    NameNode nameNode = cluster.getNameNode();
+    NameNodeAdapter.enterSafeMode(nameNode, false);
+    NameNodeAdapter.saveNamespace(nameNode);
+    NameNodeAdapter.leaveSafeMode(nameNode);
+    cluster.restartNameNode(true);
+    
+    // read snapshot file after restart
+    String test2snapshotPath = Snapshot.getSnapshotPath(path.toString(),
+        "s1/test/test2");
+    DFSTestUtil.readFile(fs, new Path(test2snapshotPath));
+    String test3snapshotPath = Snapshot.getSnapshotPath(path.toString(),
+        "s1/test/test3");
+    DFSTestUtil.readFile(fs, new Path(test3snapshotPath));
+  }
+
+  @Test
+  public void testFilesDeletionWithCheckpoint() throws Exception {
+    Path path = new Path("/test");
+    doWriteAndAbort(fs, path);
+    fs.delete(new Path("/test/test/test2"), true);
+    fs.delete(new Path("/test/test/test3"), true);
+    NameNode nameNode = cluster.getNameNode();
+    NameNodeAdapter.enterSafeMode(nameNode, false);
+    NameNodeAdapter.saveNamespace(nameNode);
+    NameNodeAdapter.leaveSafeMode(nameNode);
+    cluster.restartNameNode(true);
+    
+    // read snapshot file after restart
+    String test2snapshotPath = Snapshot.getSnapshotPath(path.toString(),
+        "s1/test/test2");
+    DFSTestUtil.readFile(fs, new Path(test2snapshotPath));
+    String test3snapshotPath = Snapshot.getSnapshotPath(path.toString(),
+        "s1/test/test3");
+    DFSTestUtil.readFile(fs, new Path(test3snapshotPath));
+  }
+
   private void doWriteAndAbort(DistributedFileSystem fs, Path path)
       throws IOException {
     fs.mkdirs(path);
@@ -110,4 +156,55 @@ public class TestOpenFilesWithSnapshot {
     DFSTestUtil.abortStream((DFSOutputStream) out2.getWrappedStream());
     fs.createSnapshot(path, "s1");
   }
+
+  @Test
+  public void testOpenFilesWithMultipleSnapshots() throws Exception {
+    doTestMultipleSnapshots(true);
+  }
+
+  @Test
+  public void testOpenFilesWithMultipleSnapshotsWithoutCheckpoint()
+      throws Exception {
+    doTestMultipleSnapshots(false);
+  }
+
+  private void doTestMultipleSnapshots(boolean saveNamespace)
+      throws IOException, AccessControlException {
+    Path path = new Path("/test");
+    doWriteAndAbort(fs, path);
+    fs.createSnapshot(path, "s2");
+    fs.delete(new Path("/test/test"), true);
+    fs.deleteSnapshot(path, "s2");
+    if (saveNamespace) {
+      NameNode nameNode = cluster.getNameNode();
+      NameNodeAdapter.enterSafeMode(nameNode, false);
+      NameNodeAdapter.saveNamespace(nameNode);
+      NameNodeAdapter.leaveSafeMode(nameNode);
+    }
+    cluster.restartNameNode(true);
+  }
+  
+  @Test
+  public void testOpenFilesWithRename() throws Exception {
+    Path path = new Path("/test");
+    doWriteAndAbort(fs, path);
+
+    // check for zero sized blocks
+    Path fileWithEmptyBlock = new Path("/test/test/test4");
+    fs.create(fileWithEmptyBlock);
+    NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc();
+    String clientName = fs.getClient().getClientName();
+    // create one empty block
+    nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null,
+        INodeId.GRANDFATHER_INODE_ID, null);
+    fs.createSnapshot(path, "s2");
+
+    fs.rename(new Path("/test/test"), new Path("/test/test-renamed"));
+    fs.delete(new Path("/test/test-renamed"), true);
+    NameNode nameNode = cluster.getNameNode();
+    NameNodeAdapter.enterSafeMode(nameNode, false);
+    NameNodeAdapter.saveNamespace(nameNode);
+    NameNodeAdapter.leaveSafeMode(nameNode);
+    cluster.restartNameNode(true);
+  }
 }
\ No newline at end of file