You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 18:28:51 UTC

svn commit: r1152295 [7/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/j...

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java Fri Jul 29 16:28:45 2011
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.junit.Assert.*;
 import junit.framework.Assert;
 import java.io.*;
-import java.net.URI;
-import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -28,9 +27,9 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -47,20 +46,6 @@ public class TestCheckPointForSecurityTo
   short replication = 3;
   MiniDFSCluster cluster = null;
 
-  NameNode startNameNode( Configuration conf,
-                          String imageDirs,
-                          String editsDirs,
-                          StartupOption start) throws IOException {
-    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://localhost:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");  
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, imageDirs);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, editsDirs);
-    String[] args = new String[]{start.getName()};
-    NameNode nn = NameNode.createNameNode(args, conf);
-    Assert.assertTrue(nn.isInSafeMode());
-    return nn;
-  }
-  
   private void cancelToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
     cluster.getNamesystem().cancelDelegationToken(token);
@@ -95,10 +80,12 @@ public class TestCheckPointForSecurityTo
       String[] args = new String[]{"-saveNamespace"};
 
       // verify that the edits file is NOT empty
-      Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
-      for(URI uri : editsDirs) {
-        File ed = new File(uri.getPath());
-        Assert.assertTrue(new File(ed, "current/edits").length() > Integer.SIZE/Byte.SIZE);
+      NameNode nn = cluster.getNameNode();
+      for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
+        FoundEditLog log = FSImageTestUtil.findLatestEditsLog(sd);
+        assertTrue(log.isInProgress());
+        assertEquals("In-progress log " + log + " should have 5 transactions",
+            5, log.validateLog().numTransactions);
       }
 
       // Saving image in safe mode should succeed
@@ -108,10 +95,12 @@ public class TestCheckPointForSecurityTo
       } catch(Exception e) {
         throw new IOException(e.getMessage());
       }
-      // verify that the edits file is empty
-      for(URI uri : editsDirs) {
-        File ed = new File(uri.getPath());
-        Assert.assertTrue(new File(ed, "current/edits").length() == Integer.SIZE/Byte.SIZE);
+      // verify that the edits file is empty except for the START txn
+      for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
+        FoundEditLog log = FSImageTestUtil.findLatestEditsLog(sd);
+        assertTrue(log.isInProgress());
+        assertEquals("In-progress log " + log + " should only have START txn",
+            1, log.validateLog().numTransactions);
       }
 
       // restart cluster

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Fri Jul 29 16:28:45 2011
@@ -17,45 +17,84 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import junit.framework.TestCase;
-import java.io.*;
 import java.net.InetSocketAddress;
+import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Iterator;
 import java.util.Random;
 
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+
+import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
 
 /**
  * This class tests the creation and validation of a checkpoint.
  */
 public class TestCheckpoint extends TestCase {
+
+  static {
+    ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  static final Log LOG = LogFactory.getLog(TestCheckpoint.class); 
+  
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 4096;
   static final int fileSize = 8192;
   static final int numDatanodes = 3;
   short replication = 3;
+    
+  @Override
+  public void setUp() throws IOException {
+    FileUtil.fullyDeleteContents(new File(MiniDFSCluster.getBaseDirectory()));
+    ErrorSimulator.initializeErrorSimulationEvent(5);
+  }
 
   static void writeFile(FileSystem fileSys, Path name, int repl)
     throws IOException {
@@ -85,136 +124,93 @@ public class TestCheckpoint extends Test
     assertTrue(!fileSys.exists(name));
   }
 
-  /**
-   * put back the old namedir
-   */
-  private void resurrectNameDir(File namedir) 
-    throws IOException {
-    String parentdir = namedir.getParent();
-    String name = namedir.getName();
-    File oldname =  new File(parentdir, name + ".old");
-    if (!oldname.renameTo(namedir)) {
-      assertTrue(false);
-    }
-  }
-
-  /**
-   * remove one namedir
-   */
-  private void removeOneNameDir(File namedir) 
-    throws IOException {
-    String parentdir = namedir.getParent();
-    String name = namedir.getName();
-    File newname =  new File(parentdir, name + ".old");
-    if (!namedir.renameTo(newname)) {
-      assertTrue(false);
-    }
-  }
-
   /*
    * Verify that namenode does not startup if one namedir is bad.
    */
-  private void testNamedirError(Configuration conf, Collection<URI> namedirs) 
-    throws IOException {
-    System.out.println("Starting testNamedirError");
-    MiniDFSCluster cluster = null;
-
-    if (namedirs.size() <= 1) {
-      return;
-    }
+  public void testNameDirError() throws IOException {
+    LOG.info("Starting testNameDirError");
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
     
-    //
-    // Remove one namedir & Restart cluster. This should fail.
-    //
-    File first = new File(namedirs.iterator().next().getPath());
-    removeOneNameDir(first);
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
-      cluster.shutdown();
-      assertTrue(false);
-    } catch (Throwable t) {
-      // no nothing
-    }
-    resurrectNameDir(first); // put back namedir
-  }
-
-  /**
-   * Tests EditLogFileOutputStream doesn't throw NullPointerException on being
-   * closed twice.
-   * See https://issues.apache.org/jira/browse/HDFS-2011
-   */
-  public void testEditLogFileOutputStreamCloses()
-    throws IOException,NullPointerException {
-    System.out.println("Testing EditLogFileOutputStream doesn't throw " +
-                       "NullPointerException on being closed twice");
-    File editLogStreamFile = null;
-    try {
-      editLogStreamFile = new File(System.getProperty("test.build.data","/tmp"),
-                                   "editLogStream.dat");
-      EditLogFileOutputStream editLogStream =
-                             new EditLogFileOutputStream(editLogStreamFile, 0);
-      editLogStream.close();
-      //Closing an twice should not throw a NullPointerException
-      editLogStream.close();
-    } finally {
-      if (editLogStreamFile != null)
-        // Cleanup the editLogStream.dat file we created
-          editLogStreamFile.delete();
+    Collection<URI> nameDirs = cluster.getNameDirs(0);
+    cluster.shutdown();
+    cluster = null;
+    
+    for (URI nameDirUri : nameDirs) {
+      File dir = new File(nameDirUri.getPath());
+      
+      try {
+        // Simulate the mount going read-only
+        dir.setWritable(false);
+        cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(0)
+          .format(false)
+          .build();
+        fail("NN should have failed to start with " + dir + " set unreadable");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "storage directory does not exist or is not accessible",
+            ioe);
+      } finally {
+        if (cluster != null) {
+          cluster.shutdown();
+          cluster = null;
+        }
+        dir.setWritable(true);
+      }
     }
-    System.out.println("Successfully tested EditLogFileOutputStream doesn't " +
-           "throw NullPointerException on being closed twice");
   }
 
   /**
-   * Checks that an IOException in NNStorage.setCheckpointTimeInStorage is handled
+   * Checks that an IOException in NNStorage.writeTransactionIdFile is handled
    * correctly (by removing the storage directory)
    * See https://issues.apache.org/jira/browse/HDFS-2011
    */
-  public void testSetCheckpointTimeInStorageHandlesIOException() throws Exception {
-    System.out.println("Check IOException handled correctly by setCheckpointTimeInStorage");
-    NNStorage nnStorage = new NNStorage(new HdfsConfiguration());
+  public void testWriteTransactionIdHandlesIOE() throws Exception {
+    LOG.info("Check IOException handled correctly by writeTransactionIdFile");
     ArrayList<URI> fsImageDirs = new ArrayList<URI>();
     ArrayList<URI> editsDirs = new ArrayList<URI>();
     File filePath =
       new File(System.getProperty("test.build.data","/tmp"), "storageDirToCheck");
     assertTrue("Couldn't create directory storageDirToCheck",
                filePath.exists() || filePath.mkdirs());
+    fsImageDirs.add(filePath.toURI());
+    editsDirs.add(filePath.toURI());
+    NNStorage nnStorage = new NNStorage(new HdfsConfiguration(),
+      fsImageDirs, editsDirs);
     try {
-      fsImageDirs.add(filePath.toURI());
-      editsDirs.add(filePath.toURI());
-      // Initialize NNStorage
-      nnStorage.setStorageDirectories(fsImageDirs, editsDirs);
       assertTrue("List of storage directories didn't have storageDirToCheck.",
                  nnStorage.getEditsDirectories().iterator().next().
                  toString().indexOf("storageDirToCheck") != -1);
       assertTrue("List of removed storage directories wasn't empty",
                  nnStorage.getRemovedStorageDirs().isEmpty());
     } finally {
-      // Delete storage directory to cause IOException in setCheckpointTimeInStorage
+      // Delete storage directory to cause IOException in writeTransactionIdFile 
       assertTrue("Couldn't remove directory " + filePath.getAbsolutePath(),
                  filePath.delete());
     }
-    // Just call setCheckpointTimeInStorage using any random number
-    nnStorage.setCheckpointTimeInStorage(1);
+    // Just call writeTransactionIdFile using any random number
+    nnStorage.writeTransactionIdFileToStorage(1);
     List<StorageDirectory> listRsd = nnStorage.getRemovedStorageDirs();
     assertTrue("Removed directory wasn't what was expected",
                listRsd.size() > 0 && listRsd.get(listRsd.size() - 1).getRoot().
                toString().indexOf("storageDirToCheck") != -1);
-    System.out.println("Successfully checked IOException is handled correctly "
-                       + "by setCheckpointTimeInStorage");
   }
 
   /*
    * Simulate namenode crashing after rolling edit log.
    */
   @SuppressWarnings("deprecation")
-  private void testSecondaryNamenodeError1(Configuration conf)
+  public void testSecondaryNamenodeError1()
     throws IOException {
-    System.out.println("Starting testSecondaryNamenodeError 1");
+    LOG.info("Starting testSecondaryNamenodeError1");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointxx.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -248,27 +244,9 @@ public class TestCheckpoint extends Test
     // Then take another checkpoint to verify that the 
     // namenode restart accounted for the rolled edit logs.
     //
-    System.out.println("Starting testSecondaryNamenodeError 2");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
                                               .format(false).build();
     cluster.waitActive();
-    // Also check that the edits file is empty here
-    // and that temporary checkpoint files are gone.
-    FSImage image = cluster.getNameNode().getFSImage();
-    for (Iterator<StorageDirectory> it = 
-           image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      assertFalse(image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE_NEW).exists());
-    }
-    for (Iterator<StorageDirectory> it = 
-           image.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      assertFalse(image.getStorage().getEditNewFile(sd).exists());
-      File edits = image.getStorage().getEditFile(sd);
-      assertTrue(edits.exists()); // edits should exist and be empty
-      long editsLen = edits.length();
-      assertTrue(editsLen == Integer.SIZE/Byte.SIZE);
-    }
     
     fileSys = cluster.getFileSystem();
     try {
@@ -287,13 +265,13 @@ public class TestCheckpoint extends Test
    * Simulate a namenode crash after uploading new image
    */
   @SuppressWarnings("deprecation")
-  private void testSecondaryNamenodeError2(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryNamenodeError 21");
+  public void testSecondaryNamenodeError2() throws IOException {
+    LOG.info("Starting testSecondaryNamenodeError2");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointyy.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -327,7 +305,6 @@ public class TestCheckpoint extends Test
     // Then take another checkpoint to verify that the 
     // namenode restart accounted for the rolled edit logs.
     //
-    System.out.println("Starting testSecondaryNamenodeError 22");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
@@ -347,13 +324,13 @@ public class TestCheckpoint extends Test
    * Simulate a secondary namenode crash after rolling the edit log.
    */
   @SuppressWarnings("deprecation")
-  private void testSecondaryNamenodeError3(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryNamenodeError 31");
+  public void testSecondaryNamenodeError3() throws IOException {
+    LOG.info("Starting testSecondaryNamenodeError3");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointzz.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
 
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
@@ -396,7 +373,6 @@ public class TestCheckpoint extends Test
     // Then take another checkpoint to verify that the 
     // namenode restart accounted for the twice-rolled edit logs.
     //
-    System.out.println("Starting testSecondaryNamenodeError 32");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
@@ -418,24 +394,22 @@ public class TestCheckpoint extends Test
    * Used to truncate primary fsimage file.
    */
   @SuppressWarnings("deprecation")
-  void testSecondaryFailsToReturnImage(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryFailsToReturnImage");
+  public void testSecondaryFailsToReturnImage() throws IOException {
+    LOG.info("Starting testSecondaryFailsToReturnImage");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointRI.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     FSImage image = cluster.getNameNode().getFSImage();
     try {
       assertTrue(!fileSys.exists(file1));
-      StorageDirectory sd = null;
-      for (Iterator<StorageDirectory> it = 
-                image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
-         sd = it.next();
-      assertTrue(sd != null);
-      long fsimageLength = image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE).length();
+      StorageDirectory sd = image.getStorage().getStorageDir(0);
+      
+      File latestImageBeforeCheckpoint = FSImageTestUtil.findLatestImageFile(sd);
+      long fsimageLength = latestImageBeforeCheckpoint.length();
       //
       // Make the checkpoint
       //
@@ -444,18 +418,21 @@ public class TestCheckpoint extends Test
 
       try {
         secondary.doCheckpoint();  // this should fail
-        assertTrue(false);
+        fail("Checkpoint succeeded even though we injected an error!");
       } catch (IOException e) {
-        System.out.println("testSecondaryFailsToReturnImage: doCheckpoint() " +
-            "failed predictably - " + e);
+        // check that it's the injected exception
+        GenericTestUtils.assertExceptionContains(
+            "If this exception is not caught", e);
       }
       ErrorSimulator.clearErrorSimulation(2);
 
       // Verify that image file sizes did not change.
-      for (Iterator<StorageDirectory> it = 
-              image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue(image.getStorage().getStorageFile(it.next(), 
-                                NameNodeFile.IMAGE).length() == fsimageLength);
+      for (StorageDirectory sd2 :
+        image.getStorage().dirIterable(NameNodeDirType.IMAGE)) {
+        
+        File thisNewestImage = FSImageTestUtil.findLatestImageFile(sd2);
+        long len = thisNewestImage.length();
+        assertEquals(fsimageLength, len);
       }
 
       secondary.shutdown();
@@ -466,17 +443,41 @@ public class TestCheckpoint extends Test
   }
 
   /**
-   * Simulate namenode failing to send the whole file
-   * secondary namenode sometimes assumed it received all of it
+   * Simulate 2NN failing to send the whole file (error type 3)
+   * The length header in the HTTP transfer should prevent
+   * this from corrupting the NN.
+   */
+  public void testNameNodeImageSendFailWrongSize()
+      throws IOException {
+    LOG.info("Starting testNameNodeImageSendFailWrongSize");
+    doSendFailTest(3, "is not of the advertised size");
+  }
+
+  /**
+   * Simulate 2NN sending a corrupt image (error type 4)
+   * The digest header in the HTTP transfer should prevent
+   * this from corrupting the NN.
+   */
+  public void testNameNodeImageSendFailWrongDigest()
+      throws IOException {
+    LOG.info("Starting testNameNodeImageSendFailWrongDigest");
+    doSendFailTest(4, "does not match advertised digest");
+  }
+
+  /**
+   * Run a test where the 2NN runs into some kind of error when
+   * sending the checkpoint back to the NN.
+   * @param errorType the ErrorSimulator type to trigger
+   * @param exceptionSubstring an expected substring of the triggered exception
    */
   @SuppressWarnings("deprecation")
-  void testNameNodeImageSendFail(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testNameNodeImageSendFail");
-    Path file1 = new Path("checkpointww.dat");
+  private void doSendFailTest(int errorType, String exceptionSubstring)
+      throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    Path file1 = new Path("checkpoint-doSendFailTest-" + errorType + ".dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -485,21 +486,21 @@ public class TestCheckpoint extends Test
       // Make the checkpoint fail after rolling the edit log.
       //
       SecondaryNameNode secondary = startSecondaryNameNode(conf);
-      ErrorSimulator.setErrorSimulation(3);
+      ErrorSimulator.setErrorSimulation(errorType);
 
       try {
         secondary.doCheckpoint();  // this should fail
         fail("Did not get expected exception");
       } catch (IOException e) {
         // We only sent part of the image. Have to trigger this exception
-        assertTrue(e.getMessage().contains("is not of the advertised size"));
+        GenericTestUtils.assertExceptionContains(exceptionSubstring, e);
       }
-      ErrorSimulator.clearErrorSimulation(3);
+      ErrorSimulator.clearErrorSimulation(errorType);
       secondary.shutdown(); // secondary namenode crash!
 
       // start new instance of secondary and verify that 
-      // a new rollEditLog suceedes inspite of the fact that 
-      // edits.new already exists.
+      // a new rollEditLog succedes in spite of the fact that we had
+      // a partially failed checkpoint previously.
       //
       secondary = startSecondaryNameNode(conf);
       secondary.doCheckpoint();  // this should work correctly
@@ -515,184 +516,249 @@ public class TestCheckpoint extends Test
       cluster.shutdown();
     }
   }
+  
   /**
-   * Test different startup scenarios.
-   * <p><ol>
-   * <li> Start of primary name-node in secondary directory must succeed. 
-   * <li> Start of secondary node when the primary is already running in 
-   *      this directory must fail.
-   * <li> Start of primary name-node if secondary node is already running in 
-   *      this directory must fail.
-   * <li> Start of two secondary nodes in the same directory must fail.
-   * <li> Import of a checkpoint must fail if primary 
-   * directory contains a valid image.
-   * <li> Import of the secondary image directory must succeed if primary 
-   * directory does not exist.
-   * <li> Recover failed checkpoint for secondary node.
-   * <li> Complete failed checkpoint for secondary node.
-   * </ol>
-   */
-  @SuppressWarnings("deprecation")
-  void testStartup(Configuration conf) throws IOException {
-    System.out.println("Startup of the name-node in the checkpoint directory.");
-    String primaryDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
-    String primaryEditsDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
-    String checkpointDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
-    String checkpointEditsDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
-    NameNode nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
-                                 StartupOption.REGULAR);
+   * Test that the NN locks its storage and edits directories, and won't start up
+   * if the directories are already locked
+   **/
+  public void testNameDirLocking() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
+    
+    // Start a NN, and verify that lock() fails in all of the configured
+    // directories
+    StorageDirectory savedSd = null;
+    try {
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
+      for (StorageDirectory sd : storage.dirIterable(null)) {
+        assertLockFails(sd);
+        savedSd = sd;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+    assertNotNull(savedSd);
+    
+    // Lock one of the saved directories, then start the NN, and make sure it
+    // fails to start
+    assertClusterStartFailsWhenDirLocked(conf, savedSd);
+  }
 
-    // Starting secondary node in the same directory as the primary
-    System.out.println("Startup of secondary in the same dir as the primary.");
+  /**
+   * Test that, if the edits dir is separate from the name dir, it is
+   * properly locked.
+   **/
+  public void testSeparateEditsDirLocking() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+        "/testSeparateEditsDirLocking");
+    
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        editsDir.getAbsolutePath());
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .manageNameDfsDirs(false)
+      .numDataNodes(0)
+      .build();
+    
+    // Start a NN, and verify that lock() fails in all of the configured
+    // directories
+    StorageDirectory savedSd = null;
+    try {
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
+      for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+        assertEquals(editsDir.getAbsoluteFile(), sd.getRoot());
+        assertLockFails(sd);
+        savedSd = sd;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+    assertNotNull(savedSd);
+    
+    // Lock one of the saved directories, then start the NN, and make sure it
+    // fails to start
+    assertClusterStartFailsWhenDirLocked(conf, savedSd);
+  }
+  
+  /**
+   * Test that the SecondaryNameNode properly locks its storage directories.
+   */
+  @SuppressWarnings("deprecation")
+  public void testSecondaryNameNodeLocking() throws Exception {
+    // Start a primary NN so that the secondary will start successfully
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
+    
     SecondaryNameNode secondary = null;
     try {
+      StorageDirectory savedSd = null;
+      // Start a secondary NN, then make sure that all of its storage
+      // dirs got locked.
       secondary = startSecondaryNameNode(conf);
-      assertFalse(secondary.getFSImage().getStorage().isLockSupported(0));
+      
+      NNStorage storage = secondary.getFSImage().getStorage();
+      for (StorageDirectory sd : storage.dirIterable(null)) {
+        assertLockFails(sd);
+        savedSd = sd;
+      }
+      LOG.info("===> Shutting down first 2NN");
       secondary.shutdown();
-    } catch (IOException e) { // expected to fail
-      assertTrue(secondary == null);
-    }
-    nn.stop(); nn = null;
+      secondary = null;
 
-    // Starting primary node in the same directory as the secondary
-    System.out.println("Startup of primary in the same dir as the secondary.");
-    // secondary won't start without primary
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.REGULAR);
-    boolean succeed = false;
-    do {
+      LOG.info("===> Locking a dir, starting second 2NN");
+      // Lock one of its dirs, make sure it fails to start
+      LOG.info("Trying to lock" + savedSd);
+      savedSd.lock();
       try {
         secondary = startSecondaryNameNode(conf);
-        succeed = true;
-      } catch(IOException ie) { // keep trying
-        System.out.println("Try again: " + ie.getLocalizedMessage());
-      }
-    } while(!succeed);
-    nn.stop(); nn = null;
-    try {
-      nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
-                          StartupOption.REGULAR);
-      assertFalse(nn.getFSImage().getStorage().isLockSupported(0));
-      nn.stop(); nn = null;
-    } catch (IOException e) { // expected to fail
-      assertTrue(nn == null);
-    }
-
-    // Try another secondary in the same directory
-    System.out.println("Startup of two secondaries in the same dir.");
-    // secondary won't start without primary
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.REGULAR);
-    SecondaryNameNode secondary2 = null;
-    try {
-      secondary2 = startSecondaryNameNode(conf);
-      assertFalse(secondary2.getFSImage().getStorage().isLockSupported(0));
-      secondary2.shutdown();
-    } catch (IOException e) { // expected to fail
-      assertTrue(secondary2 == null);
+        assertFalse("Should fail to start 2NN when " + savedSd + " is locked",
+            savedSd.isLockSupported());
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains("already locked", ioe);
+      } finally {
+        savedSd.unlock();
+      }
+      
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      cluster.shutdown();
     }
-    nn.stop(); nn = null;
-    secondary.shutdown();
+  }
+  
 
-    // Import a checkpoint with existing primary image.
-    System.out.println("Import a checkpoint with existing primary image.");
+  /**
+   * Assert that the given storage directory can't be locked, because
+   * it's already locked.
+   */
+  private static void assertLockFails(StorageDirectory sd) {
     try {
-      nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                          StartupOption.IMPORT);
-      assertTrue(false);
-    } catch (IOException e) { // expected to fail
-      assertTrue(nn == null);
-    }
-    
-    // Remove current image and import a checkpoint.
-    System.out.println("Import a checkpoint with existing primary image.");
-    List<URI> nameDirs = (List<URI>)FSNamesystem.getNamespaceDirs(conf);
-    List<URI> nameEditsDirs = (List<URI>)FSNamesystem.
-                                  getNamespaceEditsDirs(conf);
-    long fsimageLength = new File(new File(nameDirs.get(0).getPath(), "current"), 
-                                        NameNodeFile.IMAGE.getName()).length();
-    for(URI uri : nameDirs) {
-      File dir = new File(uri.getPath());
-      if(dir.exists())
-        if(!(FileUtil.fullyDelete(dir)))
-          throw new IOException("Cannot remove directory: " + dir);
-      if (!dir.mkdirs())
-        throw new IOException("Cannot create directory " + dir);
+      sd.lock();
+      // If the above line didn't throw an exception, then
+      // locking must not be supported
+      assertFalse(sd.isLockSupported());
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("already locked", ioe);
     }
-
-    for(URI uri : nameEditsDirs) {
-      File dir = new File(uri.getPath());
-      if(dir.exists())
-        if(!(FileUtil.fullyDelete(dir)))
-          throw new IOException("Cannot remove directory: " + dir);
-      if (!dir.mkdirs())
-        throw new IOException("Cannot create directory " + dir);
-    }
-    
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.IMPORT);
-    // Verify that image file sizes did not change.
-    FSImage image = nn.getFSImage();
-    for (Iterator<StorageDirectory> it = 
-            image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      assertTrue(image.getStorage().getStorageFile(it.next(), 
-                          NameNodeFile.IMAGE).length() == fsimageLength);
-    }
-    nn.stop();
-
-    // recover failed checkpoint
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.REGULAR);
-    Collection<URI> secondaryDirs = FSImage.getCheckpointDirs(conf, null);
-    for(URI uri : secondaryDirs) {
-      File dir = new File(uri.getPath());
-      Storage.rename(new File(dir, "current"), 
-                     new File(dir, "lastcheckpoint.tmp"));
+  }
+  
+  /**
+   * Assert that, if sdToLock is locked, the cluster is not allowed to start up.
+   * @param conf cluster conf to use
+   * @param sdToLock the storage directory to lock
+   */
+  private static void assertClusterStartFailsWhenDirLocked(
+      Configuration conf, StorageDirectory sdToLock) throws IOException {
+    // Lock the edits dir, then start the NN, and make sure it fails to start
+    sdToLock.lock();
+    try {      
+      MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .manageNameDfsDirs(false)
+        .numDataNodes(0)
+        .build();
+      assertFalse("cluster should fail to start after locking " +
+          sdToLock, sdToLock.isLockSupported());
+      cluster.shutdown();
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("already locked", ioe);
+    } finally {
+      sdToLock.unlock();
     }
-    secondary = startSecondaryNameNode(conf);
-    secondary.shutdown();
-    for(URI uri : secondaryDirs) {
-      File dir = new File(uri.getPath());
-      assertTrue(new File(dir, "current").exists()); 
-      assertFalse(new File(dir, "lastcheckpoint.tmp").exists());
+  }
+
+  /**
+   * Test the importCheckpoint startup option. Verifies:
+   * 1. if the NN already contains an image, it will not be allowed
+   *   to import a checkpoint.
+   * 2. if the NN does not contain an image, importing a checkpoint
+   *    succeeds and re-saves the image
+   */
+  @SuppressWarnings("deprecation")
+  public void testImportCheckpoint() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    Path testPath = new Path("/testfile");
+    SecondaryNameNode snn = null;
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
+    Collection<URI> nameDirs = cluster.getNameDirs(0);
+    try {
+      // Make an entry in the namespace, used for verifying checkpoint
+      // later.
+      cluster.getFileSystem().mkdirs(testPath);
+      
+      // Take a checkpoint
+      snn = startSecondaryNameNode(conf);
+      snn.doCheckpoint();
+    } finally {
+      if (snn != null) {
+        snn.shutdown();
+      }
+      cluster.shutdown();
+      cluster = null;
     }
     
-    // complete failed checkpoint
-    for(URI uri : secondaryDirs) {
-      File dir = new File(uri.getPath());
-      Storage.rename(new File(dir, "previous.checkpoint"), 
-                     new File(dir, "lastcheckpoint.tmp"));
+    LOG.info("Trying to import checkpoint when the NameNode already " +
+    		"contains an image. This should fail.");
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .format(false)
+      .startupOption(StartupOption.IMPORT)
+      .build();
+      fail("NameNode did not fail to start when it already contained " +
+      		"an image");
+    } catch (IOException ioe) {
+      // Expected
+      GenericTestUtils.assertExceptionContains(
+          "NameNode already contains an image", ioe);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+        cluster = null;
+      }
     }
-    secondary = startSecondaryNameNode(conf);
-    secondary.shutdown();
-    for(URI uri : secondaryDirs) {
+    
+    LOG.info("Removing NN storage contents");
+    for(URI uri : nameDirs) {
       File dir = new File(uri.getPath());
-      assertTrue(new File(dir, "current").exists()); 
-      assertTrue(new File(dir, "previous.checkpoint").exists()); 
-      assertFalse(new File(dir, "lastcheckpoint.tmp").exists());
+      LOG.info("Cleaning " + dir);
+      removeAndRecreateDir(dir);
     }
-    nn.stop(); nn = null;
     
-    // Check that everything starts ok now.
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
-    cluster.waitActive();
-    cluster.shutdown();
-  }
+    LOG.info("Trying to import checkpoint");
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+        .format(false)
+        .numDataNodes(0)
+        .startupOption(StartupOption.IMPORT)
+        .build();
+      
+      assertTrue("Path from checkpoint should exist after import",
+          cluster.getFileSystem().exists(testPath));
 
-  NameNode startNameNode( Configuration conf,
-                          String imageDirs,
-                          String editsDirs,
-                          StartupOption start) throws IOException {
-    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://localhost:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");  
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, imageDirs);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, editsDirs);
-    String[] args = new String[]{start.getName()};
-    NameNode nn = NameNode.createNameNode(args, conf);
-    assertTrue(nn.isInSafeMode());
-    return nn;
+      // Make sure that the image got saved on import
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, Ints.asList(3));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
   }
-
+  
+  private static void removeAndRecreateDir(File dir) throws IOException {
+    if(dir.exists())
+      if(!(FileUtil.fullyDelete(dir)))
+        throw new IOException("Cannot remove directory: " + dir);
+    if (!dir.mkdirs())
+      throw new IOException("Cannot create directory " + dir);
+  }
+  
   // This deprecation suppress warning does not work due to known Java bug:
   // http://bugs.sun.com/view_bug.do?bug_id=6460147
   @SuppressWarnings("deprecation")
@@ -701,6 +767,16 @@ public class TestCheckpoint extends Test
     conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     return new SecondaryNameNode(conf);
   }
+  
+  @SuppressWarnings("deprecation")
+  SecondaryNameNode startSecondaryNameNode(Configuration conf, int index)
+      throws IOException {
+    Configuration snnConf = new Configuration(conf);
+    snnConf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    snnConf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/2nn-" + index);
+    return new SecondaryNameNode(snnConf);
+  }
 
   /**
    * Tests checkpoint in HDFS.
@@ -709,8 +785,6 @@ public class TestCheckpoint extends Test
   public void testCheckpoint() throws IOException {
     Path file1 = new Path("checkpoint.dat");
     Path file2 = new Path("checkpoint2.dat");
-    Collection<URI> namedirs = null;
-
     Configuration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
@@ -725,8 +799,7 @@ public class TestCheckpoint extends Test
       //
       assertTrue(!fileSys.exists(file1));
       assertTrue(!fileSys.exists(file2));
-      namedirs = cluster.getNameDirs(0);
-
+      
       //
       // Create file1
       //
@@ -737,7 +810,6 @@ public class TestCheckpoint extends Test
       // Take a checkpoint
       //
       SecondaryNameNode secondary = startSecondaryNameNode(conf);
-      ErrorSimulator.initializeErrorSimulationEvent(4);
       secondary.doCheckpoint();
       secondary.shutdown();
     } finally {
@@ -795,16 +867,6 @@ public class TestCheckpoint extends Test
       fileSys.close();
       cluster.shutdown();
     }
-
-    // file2 is left behind.
-
-    testNameNodeImageSendFail(conf);
-    testSecondaryNamenodeError1(conf);
-    testSecondaryNamenodeError2(conf);
-    testSecondaryNamenodeError3(conf);
-    testNamedirError(conf, namedirs);
-    testSecondaryFailsToReturnImage(conf);
-    testStartup(conf);
   }
 
   /**
@@ -816,7 +878,7 @@ public class TestCheckpoint extends Test
     FileContext fc;
     try {
       Configuration conf = new HdfsConfiguration();
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(true).build();
       cluster.waitActive();
       fs = (DistributedFileSystem)(cluster.getFileSystem());
       fc = FileContext.getFileContext(cluster.getURI(0));
@@ -845,7 +907,9 @@ public class TestCheckpoint extends Test
       Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
       for(URI uri : editsDirs) {
         File ed = new File(uri.getPath());
-        assertTrue(new File(ed, "current/edits").length() > Integer.SIZE/Byte.SIZE);
+        assertTrue(new File(ed, "current/"
+                            + NNStorage.getInProgressEditsFileName(1))
+                   .length() > Integer.SIZE/Byte.SIZE);
       }
 
       // Saving image in safe mode should succeed
@@ -855,10 +919,37 @@ public class TestCheckpoint extends Test
       } catch(Exception e) {
         throw new IOException(e);
       }
-      // verify that the edits file is empty
+      
+      // the following steps should have happened:
+      //   edits_inprogress_1 -> edits_1-8  (finalized)
+      //   fsimage_8 created
+      //   edits_inprogress_9 created
+      //
       for(URI uri : editsDirs) {
         File ed = new File(uri.getPath());
-        assertTrue(new File(ed, "current/edits").length() == Integer.SIZE/Byte.SIZE);
+        File curDir = new File(ed, "current");
+        LOG.info("Files in " + curDir + ":\n  " +
+            Joiner.on("\n  ").join(curDir.list()));
+        // Verify that the first edits file got finalized
+        File originalEdits = new File(curDir,
+                                      NNStorage.getInProgressEditsFileName(1));
+        assertFalse(originalEdits.exists());
+        File finalizedEdits = new File(curDir,
+            NNStorage.getFinalizedEditsFileName(1,8));
+        assertTrue(finalizedEdits.exists());
+        assertTrue(finalizedEdits.length() > Integer.SIZE/Byte.SIZE);
+
+        assertTrue(new File(ed, "current/"
+                       + NNStorage.getInProgressEditsFileName(9)).exists());
+      }
+      
+      Collection<URI> imageDirs = cluster.getNameDirs(0);
+      for (URI uri : imageDirs) {
+        File imageDir = new File(uri.getPath());
+        File savedImage = new File(imageDir, "current/"
+                                   + NNStorage.getImageFileName(8));
+        assertTrue("Should have saved image at " + savedImage,
+            savedImage.exists());        
       }
 
       // restart cluster and verify file exists
@@ -872,8 +963,12 @@ public class TestCheckpoint extends Test
       fc = FileContext.getFileContext(cluster.getURI(0));
       assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
     } finally {
-      if(fs != null) fs.close();
-      if(cluster!= null) cluster.shutdown();
+      try {
+        if(fs != null) fs.close();
+        if(cluster!= null) cluster.shutdown();
+      } catch (Throwable t) {
+        LOG.error("Failed to shutdown", t);
+      }
     }
   }
   
@@ -885,7 +980,7 @@ public class TestCheckpoint extends Test
     Configuration conf = new HdfsConfiguration();
 
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
-        .format(false).build();
+        .format(true).build();
     NameNode nn = cluster.getNameNode();
 
     SecondaryNameNode secondary = startSecondaryNameNode(conf);
@@ -906,6 +1001,61 @@ public class TestCheckpoint extends Test
   }
   
   /**
+   * Tests the following sequence of events:
+   * - secondary successfully makes a checkpoint
+   * - it then fails while trying to upload it
+   * - it then fails again for the same reason
+   * - it then tries to checkpoint a third time
+   */
+  @SuppressWarnings("deprecation")
+  public void testCheckpointAfterTwoFailedUploads() throws IOException {
+    MiniDFSCluster cluster = null;
+    SecondaryNameNode secondary = null;
+    
+    Configuration conf = new HdfsConfiguration();
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+          .format(true).build();
+  
+      secondary = startSecondaryNameNode(conf);
+
+      ErrorSimulator.setErrorSimulation(1);
+      
+      // Fail to checkpoint once
+      try {
+        secondary.doCheckpoint();
+        fail("Should have failed upload");
+      } catch (IOException ioe) {
+        LOG.info("Got expected failure", ioe);
+        assertTrue(ioe.toString().contains("Simulating error1"));
+      }
+
+      // Fail to checkpoint again
+      try {
+        secondary.doCheckpoint();
+        fail("Should have failed upload");
+      } catch (IOException ioe) {
+        LOG.info("Got expected failure", ioe);
+        assertTrue(ioe.toString().contains("Simulating error1"));
+      } finally {
+        ErrorSimulator.clearErrorSimulation(1);
+      }
+
+      // Now with the cleared error simulation, it should succeed
+      secondary.doCheckpoint();
+      
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  /**
    * Starts two namenodes and two secondary namenodes, verifies that secondary
    * namenodes are configured correctly to talk to their respective namenodes
    * and can do the checkpoint.
@@ -957,19 +1107,18 @@ public class TestCheckpoint extends Test
   }
   
   /**
-   * Simulate a secondary node failure to transfer image
-   * back to the name-node.
-   * Used to truncate primary fsimage file.
+   * Test that the secondary doesn't have to re-download image
+   * if it hasn't changed.
    */
   @SuppressWarnings("deprecation")
-  public void testSecondaryImageDownload(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryImageDownload");
+  public void testSecondaryImageDownload() throws IOException {
+    LOG.info("Starting testSecondaryImageDownload");
+    Configuration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     Path dir = new Path("/checkpoint");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .format(true).build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     FSImage image = cluster.getNameNode().getFSImage();
@@ -979,28 +1128,48 @@ public class TestCheckpoint extends Test
       // Make the checkpoint
       //
       SecondaryNameNode secondary = startSecondaryNameNode(conf);
-      long fsimageLength = image.getStorage()
-        .getStorageFile(image.getStorage().dirIterator(NameNodeDirType.IMAGE).next(),
-                        NameNodeFile.IMAGE).length();
-      assertFalse("Image is downloaded", secondary.doCheckpoint());
 
-      // Verify that image file sizes did not change.
-      for (Iterator<StorageDirectory> it = 
-             image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue("Image size does not change", image.getStorage().getStorageFile(it.next(), 
-                                NameNodeFile.IMAGE).length() == fsimageLength);
-      }
+      File secondaryDir = new File(MiniDFSCluster.getBaseDirectory(), "namesecondary1");
+      File secondaryCurrent = new File(secondaryDir, "current");
+
+      long expectedTxIdToDownload = cluster.getNameNode().getFSImage()
+      .getStorage().getMostRecentCheckpointTxId();
+
+      File secondaryFsImageBefore = new File(secondaryCurrent,
+          NNStorage.getImageFileName(expectedTxIdToDownload));
+      File secondaryFsImageAfter = new File(secondaryCurrent,
+          NNStorage.getImageFileName(expectedTxIdToDownload + 2));
+      
+      assertFalse("Secondary should start with empty current/ dir " +
+          "but " + secondaryFsImageBefore + " exists",
+          secondaryFsImageBefore.exists());
+
+      assertTrue("Secondary should have loaded an image",
+          secondary.doCheckpoint());
+      
+      assertTrue("Secondary should have downloaded original image",
+          secondaryFsImageBefore.exists());
+      assertTrue("Secondary should have created a new image",
+          secondaryFsImageAfter.exists());
+      
+      long fsimageLength = secondaryFsImageBefore.length();
+      assertEquals("Image size should not have changed",
+          fsimageLength,
+          secondaryFsImageAfter.length());
 
       // change namespace
       fileSys.mkdirs(dir);
-      assertTrue("Image is not downloaded", secondary.doCheckpoint());
-
-      for (Iterator<StorageDirectory> it = 
-             image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue("Image size increased", 
-                   image.getStorage().getStorageFile(it.next(), 
-                                                     NameNodeFile.IMAGE).length() > fsimageLength);
-     }
+      
+      assertFalse("Another checkpoint should not have to re-load image",
+          secondary.doCheckpoint());
+      
+      for (StorageDirectory sd :
+        image.getStorage().dirIterable(NameNodeDirType.IMAGE)) {
+        File imageFile = NNStorage.getImageFile(sd,
+            expectedTxIdToDownload + 5);
+        assertTrue("Image size increased",
+            imageFile.length() > fsimageLength);
+      }
 
       secondary.shutdown();
     } finally {
@@ -1008,4 +1177,660 @@ public class TestCheckpoint extends Test
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Test case where two secondary namenodes are checkpointing the same
+   * NameNode. This differs from {@link #testMultipleSecondaryNamenodes()}
+   * since that test runs against two distinct NNs.
+   * 
+   * This case tests the following interleaving:
+   * - 2NN A downloads image (up to txid 2)
+   * - 2NN A about to save its own checkpoint
+   * - 2NN B downloads image (up to txid 4)
+   * - 2NN B uploads checkpoint (txid 4)
+   * - 2NN A uploads checkpoint (txid 2)
+   * 
+   * It verifies that this works even though the earlier-txid checkpoint gets
+   * uploaded after the later-txid checkpoint.
+   */
+  @SuppressWarnings("deprecation")
+  public void testMultipleSecondaryNNsAgainstSameNN() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+    .numDataNodes(0)
+    .format(true).build();
+
+    SecondaryNameNode secondary1 = null, secondary2 = null;
+    try {
+      // Start 2NNs
+      secondary1 = startSecondaryNameNode(conf, 1);
+      secondary2 = startSecondaryNameNode(conf, 2);
+      
+      // Make the first 2NN's checkpoint process delayable - we can pause it
+      // right before it saves its checkpoint image.
+      CheckpointStorage spyImage1 = spyOnSecondaryImage(secondary1);
+      DelayAnswer delayer = new DelayAnswer(LOG);
+      Mockito.doAnswer(delayer).when(spyImage1)
+        .saveFSImageInAllDirs(Mockito.anyLong());
+
+      // Set up a thread to do a checkpoint from the first 2NN
+      DoCheckpointThread checkpointThread = new DoCheckpointThread(secondary1);
+      checkpointThread.start();
+
+      // Wait for the first checkpointer to get to where it should save its image.
+      delayer.waitForCall();
+      
+      // Now make the second checkpointer run an entire checkpoint
+      secondary2.doCheckpoint();
+      
+      // Let the first one finish
+      delayer.proceed();
+      
+      // It should have succeeded even though another checkpoint raced with it.
+      checkpointThread.join();
+      checkpointThread.propagateExceptions();
+      
+      // primary should record "last checkpoint" as the higher txid (even though
+      // a checkpoint with a lower txid finished most recently)
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
+      assertEquals(4, storage.getMostRecentCheckpointTxId());
+
+      // Should have accepted both checkpoints
+      assertNNHasCheckpoints(cluster, ImmutableList.of(2,4));
+      
+      // Now have second one checkpoint one more time just to make sure that
+      // the NN isn't left in a broken state
+      secondary2.doCheckpoint();
+      
+      // NN should have received new checkpoint
+      assertEquals(6, storage.getMostRecentCheckpointTxId());
+    } finally {
+      cleanup(secondary1);
+      cleanup(secondary2);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+    
+    // Validate invariant that files named the same are the same.
+    assertParallelFilesInvariant(cluster, ImmutableList.of(secondary1, secondary2));
+
+    // NN should have removed the checkpoint at txid 2 at this point, but has
+    // one at txid 6
+    assertNNHasCheckpoints(cluster, ImmutableList.of(4,6));
+  }
+  
+  
+  /**
+   * Test case where two secondary namenodes are checkpointing the same
+   * NameNode. This differs from {@link #testMultipleSecondaryNamenodes()}
+   * since that test runs against two distinct NNs.
+   * 
+   * This case tests the following interleaving:
+   * - 2NN A) calls rollEdits()
+   * - 2NN B) calls rollEdits()
+   * - 2NN A) paused at getRemoteEditLogManifest()
+   * - 2NN B) calls getRemoteEditLogManifest() (returns up to txid 4)
+   * - 2NN B) uploads checkpoint fsimage_4
+   * - 2NN A) allowed to proceed, also returns up to txid 4
+   * - 2NN A) uploads checkpoint fsimage_4 as well, should fail gracefully
+   * 
+   * It verifies that one of the two gets an error that it's uploading a
+   * duplicate checkpoint, and the other one succeeds.
+   */
+  @SuppressWarnings("deprecation")
+  public void testMultipleSecondaryNNsAgainstSameNN2() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+    .numDataNodes(0)
+    .format(true).build();
+
+    SecondaryNameNode secondary1 = null, secondary2 = null;
+    try {
+      // Start 2NNs
+      secondary1 = startSecondaryNameNode(conf, 1);
+      secondary2 = startSecondaryNameNode(conf, 2);
+      
+      // Make the first 2NN's checkpoint process delayable - we can pause it
+      // right before it calls getRemoteEditLogManifest.
+      // The method to set up a spy on an RPC protocol is a little bit involved
+      // since we can't spy directly on a proxy object. This sets up a mock
+      // which delegates all its calls to the original object, instead.
+      final NamenodeProtocol origNN = secondary1.getNameNode();
+      final Answer<Object> delegator = new GenericTestUtils.DelegateAnswer(origNN);
+      NamenodeProtocol spyNN = Mockito.mock(NamenodeProtocol.class, delegator);
+      DelayAnswer delayer = new DelayAnswer(LOG) {
+        protected Object passThrough(InvocationOnMock invocation) throws Throwable {
+          return delegator.answer(invocation);
+        }
+      };
+      secondary1.setNameNode(spyNN);
+      
+      Mockito.doAnswer(delayer).when(spyNN)
+        .getEditLogManifest(Mockito.anyLong());      
+          
+      // Set up a thread to do a checkpoint from the first 2NN
+      DoCheckpointThread checkpointThread = new DoCheckpointThread(secondary1);
+      checkpointThread.start();
+
+      // Wait for the first checkpointer to be about to call getEditLogManifest
+      delayer.waitForCall();
+      
+      // Now make the second checkpointer run an entire checkpoint
+      secondary2.doCheckpoint();
+      
+      // NN should have now received fsimage_4
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
+      assertEquals(4, storage.getMostRecentCheckpointTxId());
+      
+      // Let the first one finish
+      delayer.proceed();
+      
+      // Letting the first node continue should catch an exception
+      checkpointThread.join();
+      try {
+        checkpointThread.propagateExceptions();
+        fail("Didn't throw!");
+      } catch (Exception ioe) {
+        assertTrue("Unexpected exception: " +
+            StringUtils.stringifyException(ioe),
+            ioe.toString().contains("Another checkpointer already uploaded"));
+        LOG.info("Caught expected exception", ioe);
+      }
+      
+      // primary should still consider fsimage_4 the latest
+      assertEquals(4, storage.getMostRecentCheckpointTxId());
+      
+      // Now have second one checkpoint one more time just to make sure that
+      // the NN isn't left in a broken state
+      secondary2.doCheckpoint();
+      assertEquals(6, storage.getMostRecentCheckpointTxId());
+      
+      // Should have accepted both checkpoints
+      assertNNHasCheckpoints(cluster, ImmutableList.of(4,6));
+
+      // Let the first one also go again on its own to make sure it can
+      // continue at next checkpoint
+      secondary1.setNameNode(origNN);
+      secondary1.doCheckpoint();
+      
+      // NN should have received new checkpoint
+      assertEquals(8, storage.getMostRecentCheckpointTxId());
+    } finally {
+      cleanup(secondary1);
+      cleanup(secondary2);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+    
+    // Validate invariant that files named the same are the same.
+    assertParallelFilesInvariant(cluster, ImmutableList.of(secondary1, secondary2));
+    // Validate that the NN received checkpoints at expected txids
+    // (i.e that both checkpoints went through)
+    assertNNHasCheckpoints(cluster, ImmutableList.of(6,8));
+  }
+  
+  /**
+   * Test case where the name node is reformatted while the secondary namenode
+   * is running. The secondary should shut itself down if if talks to a NN
+   * with the wrong namespace.
+   */
+  @SuppressWarnings("deprecation")
+  public void testReformatNNBetweenCheckpoints() throws IOException {
+    MiniDFSCluster cluster = null;
+    SecondaryNameNode secondary = null;
+    
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        1);
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+          .format(true).build();
+      int origPort = cluster.getNameNodePort();
+      int origHttpPort = cluster.getNameNode().getHttpAddress().getPort();
+      secondary = startSecondaryNameNode(conf);
+
+      // secondary checkpoints once
+      secondary.doCheckpoint();
+
+      // we reformat primary NN
+      cluster.shutdown();
+      cluster = null;
+
+      // Brief sleep to make sure that the 2NN's IPC connection to the NN
+      // is dropped.
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+      }
+      
+      // Start a new NN with the same host/port.
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .nameNodePort(origPort)
+        .nameNodeHttpPort(origHttpPort)
+        .format(true).build();
+
+      try {
+        secondary.doCheckpoint();
+        fail("Should have failed checkpoint against a different namespace");
+      } catch (IOException ioe) {
+        LOG.info("Got expected failure", ioe);
+        assertTrue(ioe.toString().contains("Inconsistent checkpoint"));
+      }
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }  
+  }
+  
+  /**
+   * Test that the primary NN will not serve any files to a 2NN who doesn't
+   * share its namespace ID, and also will not accept any files from one.
+   */
+  public void testNamespaceVerifiedOnFileTransfer() throws IOException {
+    MiniDFSCluster cluster = null;
+    
+    Configuration conf = new HdfsConfiguration();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+          .format(true).build();
+      
+      NameNode nn = cluster.getNameNode();
+      String fsName = NameNode.getHostPortString(nn.getHttpAddress());
+
+
+      // Make a finalized log on the server side. 
+      nn.rollEditLog();
+      RemoteEditLogManifest manifest = nn.getEditLogManifest(0);
+      RemoteEditLog log = manifest.getLogs().get(0);
+      
+      NNStorage dstImage = Mockito.mock(NNStorage.class);
+      Mockito.doReturn(Lists.newArrayList(new File("/wont-be-written")))
+        .when(dstImage).getFiles(
+            Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
+      
+      Mockito.doReturn(new StorageInfo(1, 1, "X", 1).toColonSeparatedString())
+        .when(dstImage).toColonSeparatedString();
+
+      try {
+        TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false);
+        fail("Storage info was not verified");
+      } catch (IOException ioe) {
+        String msg = StringUtils.stringifyException(ioe);
+        assertTrue(msg, msg.contains("but the secondary expected"));
+      }
+
+      try {
+        TransferFsImage.downloadEditsToStorage(fsName, log, dstImage);
+        fail("Storage info was not verified");
+      } catch (IOException ioe) {
+        String msg = StringUtils.stringifyException(ioe);
+        assertTrue(msg, msg.contains("but the secondary expected"));
+      }
+
+      try {
+        InetSocketAddress fakeAddr = new InetSocketAddress(1);
+        TransferFsImage.uploadImageFromStorage(fsName, fakeAddr, dstImage, 0);
+        fail("Storage info was not verified");
+      } catch (IOException ioe) {
+        String msg = StringUtils.stringifyException(ioe);
+        assertTrue(msg, msg.contains("but the secondary expected"));
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }  
+  }
+
+  /**
+   * Test that, if a storage directory is failed when a checkpoint occurs,
+   * the non-failed storage directory receives the checkpoint.
+   */
+  @SuppressWarnings("deprecation")
+  public void testCheckpointWithFailedStorageDir() throws Exception {
+    MiniDFSCluster cluster = null;
+    SecondaryNameNode secondary = null;
+    File currentDir = null;
+    
+    Configuration conf = new HdfsConfiguration();
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+          .format(true).build();
+  
+      secondary = startSecondaryNameNode(conf);
+
+      // Checkpoint once
+      secondary.doCheckpoint();
+
+      // Now primary NN experiences failure of a volume -- fake by
+      // setting its current dir to a-x permissions
+      NameNode nn = cluster.getNameNode();
+      NNStorage storage = nn.getFSImage().getStorage();
+      StorageDirectory sd0 = storage.getStorageDir(0);
+      StorageDirectory sd1 = storage.getStorageDir(1);
+      
+      currentDir = sd0.getCurrentDir();
+      currentDir.setExecutable(false);
+
+      // Upload checkpoint when NN has a bad storage dir. This should
+      // succeed and create the checkpoint in the good dir.
+      secondary.doCheckpoint();
+      
+      GenericTestUtils.assertExists(
+          new File(sd1.getCurrentDir(), NNStorage.getImageFileName(2)));
+      
+      // Restore the good dir
+      currentDir.setExecutable(true);
+      nn.restoreFailedStorage("true");
+      nn.rollEditLog();
+
+      // Checkpoint again -- this should upload to both dirs
+      secondary.doCheckpoint();
+      
+      assertNNHasCheckpoints(cluster, ImmutableList.of(8));
+      assertParallelFilesInvariant(cluster, ImmutableList.of(secondary));
+    } finally {
+      if (currentDir != null) {
+        currentDir.setExecutable(true);
+      }
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  /**
+   * Test case where the NN is configured with a name-only and an edits-only
+   * dir, with storage-restore turned on. In this case, if the name-only dir
+   * disappears and comes back, a new checkpoint after it has been restored
+   * should function correctly.
+   * @throws Exception
+   */
+  @SuppressWarnings("deprecation")
+  public void testCheckpointWithSeparateDirsAfterNameFails() throws Exception {
+    MiniDFSCluster cluster = null;
+    SecondaryNameNode secondary = null;
+    File currentDir = null;
+    
+    Configuration conf = new HdfsConfiguration();
+
+    File base_dir = new File(MiniDFSCluster.getBaseDirectory());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, true);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/name-only");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/edits-only");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+        fileAsURI(new File(base_dir, "namesecondary1")).toString());
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+          .format(true)
+          .manageNameDfsDirs(false)
+          .build();
+  
+      secondary = startSecondaryNameNode(conf);
+
+      // Checkpoint once
+      secondary.doCheckpoint();
+
+      // Now primary NN experiences failure of its only name dir -- fake by
+      // setting its current dir to a-x permissions
+      NameNode nn = cluster.getNameNode();
+      NNStorage storage = nn.getFSImage().getStorage();
+      StorageDirectory sd0 = storage.getStorageDir(0);
+      assertEquals(NameNodeDirType.IMAGE, sd0.getStorageDirType());
+      currentDir = sd0.getCurrentDir();
+      currentDir.setExecutable(false);
+
+      // Try to upload checkpoint -- this should fail since there are no
+      // valid storage dirs
+      try {
+        secondary.doCheckpoint();
+        fail("Did not fail to checkpoint when there are no valid storage dirs");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "No targets in destination storage", ioe);
+      }
+      
+      // Restore the good dir
+      currentDir.setExecutable(true);
+      nn.restoreFailedStorage("true");
+      nn.rollEditLog();
+
+      // Checkpoint again -- this should upload to the restored name dir
+      secondary.doCheckpoint();
+      
+      assertNNHasCheckpoints(cluster, ImmutableList.of(8));
+      assertParallelFilesInvariant(cluster, ImmutableList.of(secondary));
+    } finally {
+      if (currentDir != null) {
+        currentDir.setExecutable(true);
+      }
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  /**
+   * Test that the 2NN triggers a checkpoint after the configurable interval
+   */
+  @SuppressWarnings("deprecation")
+  public void testCheckpointTriggerOnTxnCount() throws Exception {
+    MiniDFSCluster cluster = null;
+    SecondaryNameNode secondary = null;
+    Configuration conf = new HdfsConfiguration();
+
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+          .format(true).build();
+      FileSystem fs = cluster.getFileSystem();
+      secondary = startSecondaryNameNode(conf);
+      Thread t = new Thread(secondary);
+      t.start();
+      final NNStorage storage = secondary.getFSImage().getStorage();
+
+      // 2NN should checkpoint at startup
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          LOG.info("Waiting for checkpoint txn id to go to 2");
+          return storage.getMostRecentCheckpointTxId() == 2;
+        }
+      }, 200, 15000);
+
+      // If we make 10 transactions, it should checkpoint again
+      for (int i = 0; i < 10; i++) {
+        fs.mkdirs(new Path("/test" + i));
+      }
+      
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          LOG.info("Waiting for checkpoint txn id to go > 2");
+          return storage.getMostRecentCheckpointTxId() > 2;
+        }
+      }, 200, 15000);
+    } finally {
+      cleanup(secondary);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+
+  /**
+   * Test case where the secondary does a checkpoint, then stops for a while.
+   * In the meantime, the NN saves its image several times, so that the
+   * logs that connect the 2NN's old checkpoint to the current txid
+   * get archived. Then, the 2NN tries to checkpoint again.
+   */
+  @SuppressWarnings("deprecation")
+  public void testSecondaryHasVeryOutOfDateImage() throws IOException {
+    MiniDFSCluster cluster = null;
+    SecondaryNameNode secondary = null;
+    
+    Configuration conf = new HdfsConfiguration();
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+          .format(true).build();
+  
+      secondary = startSecondaryNameNode(conf);
+
+      // Checkpoint once
+      secondary.doCheckpoint();
+
+      // Now primary NN saves namespace 3 times
+      NameNode nn = cluster.getNameNode();
+      nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      for (int i = 0; i < 3; i++) {
+        nn.saveNamespace();
+      }
+      nn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+      
+      // Now the secondary tries to checkpoint again with its
+      // old image in memory.
+      secondary.doCheckpoint();
+      
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  public void testCommandLineParsing() throws ParseException {
+    SecondaryNameNode.CommandLineOpts opts =
+      new SecondaryNameNode.CommandLineOpts();
+    opts.parse();
+    assertNull(opts.getCommand());
+
+    opts.parse("-checkpoint");
+    assertEquals(SecondaryNameNode.CommandLineOpts.Command.CHECKPOINT,
+        opts.getCommand());
+    assertFalse(opts.shouldForceCheckpoint());
+
+    opts.parse("-checkpoint", "force");
+    assertEquals(SecondaryNameNode.CommandLineOpts.Command.CHECKPOINT,
+        opts.getCommand());
+    assertTrue(opts.shouldForceCheckpoint());
+
+    opts.parse("-geteditsize");
+    assertEquals(SecondaryNameNode.CommandLineOpts.Command.GETEDITSIZE,
+        opts.getCommand());
+    
+    opts.parse("-format");
+    assertTrue(opts.shouldFormat());
+    
+    try {
+      opts.parse("-geteditsize", "-checkpoint");
+      fail("Should have failed bad parsing for two actions");
+    } catch (ParseException e) {}
+    
+    try {
+      opts.parse("-checkpoint", "xx");
+      fail("Should have failed for bad checkpoint arg");
+    } catch (ParseException e) {}
+  }
+
+  @SuppressWarnings("deprecation")
+  private void cleanup(SecondaryNameNode snn) {
+    if (snn != null) {
+      try {
+        snn.shutdown();
+      } catch (Exception e) {
+        LOG.warn("Could not shut down secondary namenode", e);
+      }
+    }
+  }
+
+
+  /**
+   * Assert that if any two files have the same name across the 2NNs
+   * and NN, they should have the same content too.
+   */
+  @SuppressWarnings("deprecation")
+  private void assertParallelFilesInvariant(MiniDFSCluster cluster,
+      ImmutableList<SecondaryNameNode> secondaries) throws Exception {
+    List<File> allCurrentDirs = Lists.newArrayList();
+    allCurrentDirs.addAll(getNameNodeCurrentDirs(cluster));
+    for (SecondaryNameNode snn : secondaries) {
+      allCurrentDirs.addAll(getCheckpointCurrentDirs(snn));
+    }
+    FSImageTestUtil.assertParallelFilesAreIdentical(allCurrentDirs,
+        ImmutableSet.of("VERSION"));    
+  }
+  
+  @SuppressWarnings("deprecation")
+  private List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) {
+    List<File> ret = Lists.newArrayList();
+    for (URI u : secondary.getCheckpointDirs()) {
+      File checkpointDir = new File(u.getPath());
+      ret.add(new File(checkpointDir, "current"));
+    }
+    return ret;
+  }
+
+  @SuppressWarnings("deprecation")
+  private CheckpointStorage spyOnSecondaryImage(SecondaryNameNode secondary1) {
+    CheckpointStorage spy = Mockito.spy((CheckpointStorage)secondary1.getFSImage());;
+    secondary1.setFSImage(spy);
+    return spy;
+  }
+  
+  /**
+   * A utility class to perform a checkpoint in a different thread.
+   */
+  @SuppressWarnings("deprecation")
+  private static class DoCheckpointThread extends Thread {
+    private final SecondaryNameNode snn;
+    private volatile Throwable thrown = null;
+    
+    DoCheckpointThread(SecondaryNameNode snn) {
+      this.snn = snn;
+    }
+    
+    @Override
+    public void run() {
+      try {
+        snn.doCheckpoint();
+      } catch (Throwable t) {
+        thrown = t;
+      }
+    }
+    
+    void propagateExceptions() {
+      if (thrown != null) {
+        throw new RuntimeException(thrown);
+      }
+    }
+  }
+
 }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java Fri Jul 29 16:28:45 2011
@@ -50,7 +50,7 @@ public class TestClusterId {
     // see if cluster id not empty.
     Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(config);
     Collection<URI> editsToFormat = new ArrayList<URI>(0);
-    FSImage fsImage = new FSImage(dirsToFormat, editsToFormat);
+    FSImage fsImage = new FSImage(config, null, dirsToFormat, editsToFormat);
     
     Iterator<StorageDirectory> sdit = 
       fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);