You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 18:28:51 UTC

svn commit: r1152295 [8/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/j...

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Jul 29 16:28:45 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import junit.framework.TestCase;
 import java.io.*;
 import java.net.URI;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -27,44 +28,75 @@ import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
 
-import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.aspectj.util.FileUtil;
+
 import org.mockito.Mockito;
 
 import static org.apache.hadoop.test.MetricsAsserts.*;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.spy;
 
 /**
  * This class tests the creation and validation of a checkpoint.
  */
 public class TestEditLog extends TestCase {
-  private static final Log LOG = LogFactory.getLog(TestEditLog.class);
-
+  
+  static {
+    ((Log4JLogger)FSEditLog.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
+  static final Log LOG = LogFactory.getLog(TestEditLog.class);
+  
   static final int NUM_DATA_NODES = 0;
 
   // This test creates NUM_THREADS threads and each thread does
   // 2 * NUM_TRANSACTIONS Transactions concurrently.
   static final int NUM_TRANSACTIONS = 100;
   static final int NUM_THREADS = 100;
+  
+  private static final File TEST_DIR = new File(
+    System.getProperty("test.build.data","build/test/data"));
+
+  /** An edits log with 3 edits from 0.20 - the result of
+   * a fresh namesystem followed by hadoop fs -touchz /myfile */
+  static final byte[] HADOOP20_SOME_EDITS =
+    StringUtils.hexStringToByte((
+        "ffff ffed 0a00 0000 0000 03fa e100 0000" +
+        "0005 0007 2f6d 7966 696c 6500 0133 000d" +
+        "3132 3932 3331 3634 3034 3138 3400 0d31" +
+        "3239 3233 3136 3430 3431 3834 0009 3133" +
+        "3432 3137 3732 3800 0000 0004 746f 6464" +
+        "0a73 7570 6572 6772 6f75 7001 a400 1544" +
+        "4653 436c 6965 6e74 5f2d 3136 3136 3535" +
+        "3738 3931 000b 3137 322e 3239 2e35 2e33" +
+        "3209 0000 0005 0007 2f6d 7966 696c 6500" +
+        "0133 000d 3132 3932 3331 3634 3034 3138" +
+        "3400 0d31 3239 3233 3136 3430 3431 3834" +
+        "0009 3133 3432 3137 3732 3800 0000 0004" +
+        "746f 6464 0a73 7570 6572 6772 6f75 7001" +
+        "a4ff 0000 0000 0000 0000 0000 0000 0000"
+    ).replace(" ",""));
+
+  
+  static final byte TRAILER_BYTE = FSEditLogOpCodes.OP_INVALID.getOpCode();
 
+  private static final int CHECKPOINT_ON_STARTUP_MIN_TXNS = 100;
   //
   // an object that does a bunch of transactions
   //
@@ -96,14 +128,110 @@ public class TestEditLog extends TestCas
   }
 
   /**
+   * Test case for an empty edit log from a prior version of Hadoop.
+   */
+  public void testPreTxIdEditLogNoEdits() throws Exception {
+    FSNamesystem namesys = Mockito.mock(FSNamesystem.class);
+    namesys.dir = Mockito.mock(FSDirectory.class);
+    int numEdits = testLoad(
+        StringUtils.hexStringToByte("ffffffed"), // just version number
+        namesys);
+    assertEquals(0, numEdits);
+  }
+  
+  /**
+   * Test case for loading a very simple edit log from a format
+   * prior to the inclusion of edit transaction IDs in the log.
+   */
+  public void testPreTxidEditLogWithEdits() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      int numEdits = testLoad(HADOOP20_SOME_EDITS, namesystem);
+      assertEquals(3, numEdits);
+      // Sanity check the edit
+      HdfsFileStatus fileInfo = namesystem.getFileInfo("/myfile", false);
+      assertEquals("supergroup", fileInfo.getGroup());
+      assertEquals(3, fileInfo.getReplication());
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+  
+  private int testLoad(byte[] data, FSNamesystem namesys) throws IOException {
+    FSEditLogLoader loader = new FSEditLogLoader(namesys);
+    return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
+  }
+
+  /**
+   * Simple test for writing to and rolling the edit log.
+   */
+  public void testSimpleEditLog() throws IOException {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      FSImage fsimage = namesystem.getFSImage();
+      final FSEditLog editLog = fsimage.getEditLog();
+      
+      assertExistsInStorageDirs(
+          cluster, NameNodeDirType.EDITS, 
+          NNStorage.getInProgressEditsFileName(1));
+      
+
+      editLog.logSetReplication("fakefile", (short) 1);
+      editLog.logSync();
+      
+      editLog.rollEditLog();
+
+      assertExistsInStorageDirs(
+          cluster, NameNodeDirType.EDITS,
+          NNStorage.getFinalizedEditsFileName(1,3));
+      assertExistsInStorageDirs(
+          cluster, NameNodeDirType.EDITS,
+          NNStorage.getInProgressEditsFileName(4));
+
+      
+      editLog.logSetReplication("fakefile", (short) 2);
+      editLog.logSync();
+      
+      editLog.close();
+    } finally {
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
+
+  /**
    * Tests transaction logging in dfs.
    */
-  public void testEditLog() throws IOException {
+  public void testMultiThreadedEditLog() throws IOException {
     testEditLog(2048);
     // force edit buffer to automatically sync on each log of edit log entry
     testEditLog(1);
   }
   
+  
+  private void assertExistsInStorageDirs(MiniDFSCluster cluster,
+      NameNodeDirType dirType,
+      String filename) {
+    NNStorage storage = cluster.getNamesystem().getFSImage().getStorage();
+    for (StorageDirectory sd : storage.dirIterable(dirType)) {
+      File f = new File(sd.getCurrentDir(), filename);
+      assertTrue("Expect that " + f + " exists", f.exists());
+    }
+  }
+  
   /**
    * Test edit log with different initial buffer size
    * 
@@ -132,9 +260,11 @@ public class TestEditLog extends TestCas
       FSEditLog editLog = fsimage.getEditLog();
   
       // set small size of flush buffer
-      editLog.setBufferCapacity(initialSize);
-      editLog.close();
-      editLog.open();
+      editLog.setOutputBufferCapacity(initialSize);
+      
+      // Roll log so new output buffer size takes effect
+      // we should now be writing to edits_inprogress_3
+      fsimage.rollEditLog();
     
       // Create threads and make them run transactions concurrently.
       Thread threadId[] = new Thread[NUM_THREADS];
@@ -153,32 +283,42 @@ public class TestEditLog extends TestCas
         }
       } 
       
-      editLog.close();
-      editLog.open();
-  
+      // Roll another time to finalize edits_inprogress_3
+      fsimage.rollEditLog();
+      
+      long expectedTxns = (NUM_THREADS * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
+   
       // Verify that we can read in all the transactions that we have written.
       // If there were any corruptions, it is likely that the reading in
       // of these transactions will throw an exception.
       //
-      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       for (Iterator<StorageDirectory> it = 
               fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editFile = NNStorage.getStorageFile(it.next(), NameNodeFile.EDITS);
+        FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+        
+        File editFile = NNStorage.getFinalizedEditsFile(it.next(), 3,
+            3 + expectedTxns - 1);
+        assertTrue("Expect " + editFile + " exists", editFile.exists());
+        
         System.out.println("Verifying file: " + editFile);
         int numEdits = loader.loadFSEdits(
-                                  new EditLogFileInputStream(editFile));
+            new EditLogFileInputStream(editFile), 3);
         int numLeases = namesystem.leaseManager.countLease();
         System.out.println("Number of outstanding leases " + numLeases);
         assertEquals(0, numLeases);
         assertTrue("Verification for " + editFile + " failed. " +
-                   "Expected " + (NUM_THREADS * 2 * NUM_TRANSACTIONS) + " transactions. "+
+                   "Expected " + expectedTxns + " transactions. "+
                    "Found " + numEdits + " transactions.",
-                   numEdits == NUM_THREADS * 2 * NUM_TRANSACTIONS);
+                   numEdits == expectedTxns);
   
       }
     } finally {
-      if(fileSys != null) fileSys.close();
-      if(cluster != null) cluster.shutdown();
+      try {
+        if(fileSys != null) fileSys.close();
+        if(cluster != null) cluster.shutdown();
+      } catch (Throwable t) {
+        LOG.error("Couldn't shut down cleanly", t);
+      }
     }
   }
 
@@ -231,29 +371,29 @@ public class TestEditLog extends TestCas
       FSImage fsimage = namesystem.getFSImage();
       final FSEditLog editLog = fsimage.getEditLog();
 
-      assertEquals("should start with no txids synced",
-        0, editLog.getSyncTxId());
+      assertEquals("should start with only the BEGIN_LOG_SEGMENT txn synced",
+        1, editLog.getSyncTxId());
       
       // Log an edit from thread A
       doLogEdit(threadA, editLog, "thread-a 1");
       assertEquals("logging edit without syncing should do not affect txid",
-        0, editLog.getSyncTxId());
+        1, editLog.getSyncTxId());
 
       // Log an edit from thread B
       doLogEdit(threadB, editLog, "thread-b 1");
       assertEquals("logging edit without syncing should do not affect txid",
-        0, editLog.getSyncTxId());
+        1, editLog.getSyncTxId());
 
       // Now ask to sync edit from B, which should sync both edits.
       doCallLogSync(threadB, editLog);
       assertEquals("logSync from second thread should bump txid up to 2",
-        2, editLog.getSyncTxId());
+        3, editLog.getSyncTxId());
 
       // Now ask to sync edit from A, which was already batched in - thus
       // it should increment the batch count metric
       doCallLogSync(threadA, editLog);
       assertEquals("logSync from first thread shouldn't change txid",
-        2, editLog.getSyncTxId());
+        3, editLog.getSyncTxId());
 
       //Should have incremented the batch count exactly once
       assertCounter("TransactionsBatchedInSync", 1L, 
@@ -296,12 +436,12 @@ public class TestEditLog extends TestCas
       // Log an edit from thread A
       doLogEdit(threadA, editLog, "thread-a 1");
       assertEquals("logging edit without syncing should do not affect txid",
-        0, editLog.getSyncTxId());
+        1, editLog.getSyncTxId());
 
       // logSyncAll in Thread B
       doCallLogSyncAll(threadB, editLog);
       assertEquals("logSyncAll should sync thread A's transaction",
-        1, editLog.getSyncTxId());
+        2, editLog.getSyncTxId());
 
       // Close edit log
       editLog.close();
@@ -329,10 +469,14 @@ public class TestEditLog extends TestCas
     FSImage fsimage = namesystem.getFSImage();
     final FSEditLog editLog = fsimage.getEditLog();
     fileSys.mkdirs(new Path("/tmp"));
-    File editFile = editLog.getFsEditName();
+    StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
     editLog.close();
     cluster.shutdown();
-      long fileLen = editFile.length();
+
+    File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
+    assertTrue(editFile.exists());
+
+    long fileLen = editFile.length();
     System.out.println("File name: " + editFile + " len: " + fileLen);
     RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
     rwf.seek(fileLen-4); // seek to checksum bytes
@@ -350,54 +494,239 @@ public class TestEditLog extends TestCas
           e.getCause().getClass(), ChecksumException.class);
     }
   }
+
+  /**
+   * Test what happens if the NN crashes when it has has started but
+   * had no transactions written.
+   */
+  public void testCrashRecoveryNoTransactions() throws Exception {
+    testCrashRecovery(0);
+  }
   
-  public void testFailedOpen() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+  /**
+   * Test what happens if the NN crashes when it has has started and
+   * had a few transactions written
+   */
+  public void testCrashRecoveryWithTransactions() throws Exception {
+    testCrashRecovery(150);
+  }
+  
+  /**
+   * Do a test to make sure the edit log can recover edits even after
+   * a non-clean shutdown. This does a simulated crash by copying over
+   * the edits directory while the NN is still running, then shutting it
+   * down, and restoring that edits directory.
+   */
+  private void testCrashRecovery(int numTransactions) throws Exception {
     MiniDFSCluster cluster = null;
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
-    cluster.waitActive();
-    final FSNamesystem fsn = cluster.getNamesystem();
-
-    // Set up spys
-    final FSImage originalImage = fsn.getFSImage();
-    NNStorage storage = originalImage.getStorage();
-    NNStorage spyStorage = spy(storage);
-    originalImage.storage = spyStorage;
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
+        CHECKPOINT_ON_STARTUP_MIN_TXNS);
     
-    final FSEditLog editLog = originalImage.getEditLog();
-    FSEditLog spyLog = spy(editLog);
+    try {
+        LOG.info("\n===========================================\n" +
+                 "Starting empty cluster");
+        
+        cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(NUM_DATA_NODES)
+          .format(true)
+          .build();
+        cluster.waitActive();
+        
+        FileSystem fs = cluster.getFileSystem();
+        for (int i = 0; i < numTransactions; i++) {
+          fs.mkdirs(new Path("/test" + i));
+        }        
+        
+        // Directory layout looks like:
+        // test/data/dfs/nameN/current/{fsimage_N,edits_...}
+        File nameDir = new File(cluster.getNameDirs(0).iterator().next().getPath());
+        File dfsDir = nameDir.getParentFile();
+        assertEquals(dfsDir.getName(), "dfs"); // make sure we got right dir
+        
+        LOG.info("Copying data directory aside to a hot backup");
+        File backupDir = new File(dfsDir.getParentFile(), "dfs.backup-while-running");
+        FileUtil.copyDir(dfsDir, backupDir);;
+
+        LOG.info("Shutting down cluster #1");
+        cluster.shutdown();
+        cluster = null;
+        
+        // Now restore the backup
+        FileUtil.deleteContents(dfsDir);
+        backupDir.renameTo(dfsDir);
+        
+        // Directory layout looks like:
+        // test/data/dfs/nameN/current/{fsimage_N,edits_...}
+        File currentDir = new File(nameDir, "current");
+
+        // We should see the file as in-progress
+        File editsFile = new File(currentDir,
+            NNStorage.getInProgressEditsFileName(1));
+        assertTrue("Edits file " + editsFile + " should exist", editsFile.exists());        
+        
+        File imageFile = FSImageTestUtil.findNewestImageFile(
+            currentDir.getAbsolutePath());
+        assertNotNull("No image found in " + nameDir, imageFile);
+        assertEquals(NNStorage.getImageFileName(0), imageFile.getName());
+        
+        // Try to start a new cluster
+        LOG.info("\n===========================================\n" +
+        "Starting same cluster after simulated crash");
+        cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(NUM_DATA_NODES)
+          .format(false)
+          .build();
+        cluster.waitActive();
+        
+        // We should still have the files we wrote prior to the simulated crash
+        fs = cluster.getFileSystem();
+        for (int i = 0; i < numTransactions; i++) {
+          assertTrue(fs.exists(new Path("/test" + i)));
+        }
 
-    FSImage spyImage = spy(originalImage);
-    fsn.dir.fsImage = spyImage;
-    spyImage.storage.setStorageDirectories(
-        FSNamesystem.getNamespaceDirs(conf), 
-        FSNamesystem.getNamespaceEditsDirs(conf));
-        
-    // Fail every attempt to open a new edit file
-    doThrow(new IOException("Injected fault: open")).
-      when(spyLog).addNewEditLogStream((File)anyObject());
+        long expectedTxId;
+        if (numTransactions > CHECKPOINT_ON_STARTUP_MIN_TXNS) {
+          // It should have saved a checkpoint on startup since there
+          // were more unfinalized edits than configured
+          expectedTxId = numTransactions + 1;
+        } else {
+          // otherwise, it shouldn't have made a checkpoint
+          expectedTxId = 0;
+        }
+        imageFile = FSImageTestUtil.findNewestImageFile(
+            currentDir.getAbsolutePath());
+        assertNotNull("No image found in " + nameDir, imageFile);
+        assertEquals(NNStorage.getImageFileName(expectedTxId),
+                     imageFile.getName());
+        
+        // Started successfully. Shut it down and make sure it can restart.
+        cluster.shutdown();    
+        cluster = null;
+        
+        cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_DATA_NODES)
+        .format(false)
+        .build();
+        cluster.waitActive();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  public void testCrashRecoveryEmptyLogOneDir() throws Exception {
+    doTestCrashRecoveryEmptyLog(false);
+  }
+  
+  public void testCrashRecoveryEmptyLogBothDirs() throws Exception {
+    doTestCrashRecoveryEmptyLog(true);
+  }
+  
+  /**
+   * Test that the NN handles the corruption properly
+   * after it crashes just after creating an edit log
+   * (ie before writing START_LOG_SEGMENT). In the case
+   * that all logs have this problem, it should mark them
+   * as corrupt instead of trying to finalize them.
+   * 
+   * @param inBothDirs if true, there will be a truncated log in
+   * both of the edits directories. If false, the truncated log
+   * will only be in one of the directories. In both cases, the
+   * NN should fail to start up, because it's aware that txid 3
+   * was reached, but unable to find a non-corrupt log starting there.
+   */
+  private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(NUM_DATA_NODES).build();
+    cluster.shutdown();
+    
+    Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
+    for (URI uri : editsDirs) {
+      File dir = new File(uri.getPath());
+      File currentDir = new File(dir, "current");
+      // We should start with only the finalized edits_1-2
+      GenericTestUtils.assertGlobEquals(currentDir, "edits_.*",
+          NNStorage.getFinalizedEditsFileName(1, 2));
+      // Make a truncated edits_3_inprogress
+      File log = new File(currentDir,
+          NNStorage.getInProgressEditsFileName(3));
+      new EditLogFileOutputStream(log, 1024).create();
+      if (!inBothDirs) {
+        break;
+      }
+    }
     
     try {
-      spyLog.close();
-      spyLog.open();
-      fail("open did not fail even when all directories failed!");
-    } catch(IOException ioe) {
-      LOG.info("Got expected exception", ioe);
-    } finally {
-      spyLog.close();
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_DATA_NODES).format(false).build();
+      fail("Did not fail to start with all-corrupt logs");
+    } catch (IllegalStateException ise) {
+      GenericTestUtils.assertExceptionContains(
+          "No non-corrupt logs for txid 3", ise);
+    }
+    cluster.shutdown();
+  }
+
+  
+  private static class EditLogByteInputStream extends EditLogInputStream {
+    private InputStream input;
+    private long len;
+
+    public EditLogByteInputStream(byte[] data) {
+      len = data.length;
+      input = new ByteArrayInputStream(data);
+    }
+
+    public int available() throws IOException {
+      return input.available();
+    }
+    
+    public int read() throws IOException {
+      return input.read();
     }
     
-    // Reset and try it with a working open
-    Mockito.reset(spyLog);
-    spyImage.storage.setStorageDirectories(
-        FSNamesystem.getNamespaceDirs(conf), 
-        FSNamesystem.getNamespaceEditsDirs(conf));
-    spyLog.open();
+    public long length() throws IOException {
+      return len;
+    }
     
-    // Close everything off
-    spyLog.close();
-    originalImage.close();
-    fsn.close();
+    public int read(byte[] b, int off, int len) throws IOException {
+      return input.read(b, off, len);
+    }
+
+    public void close() throws IOException {
+      input.close();
+    }
+
+    @Override // JournalStream
+    public String getName() {
+      return "AnonEditLogByteInputStream";
+    }
+
+    @Override // JournalStream
+    public JournalType getType() {
+      return JournalType.FILE;
+    }
   }
 
+  public void testFailedOpen() throws Exception {
+    File logDir = new File(TEST_DIR, "testFailedOpen");
+    logDir.mkdirs();
+    FSEditLog log = FSImageTestUtil.createStandaloneEditLog(logDir);
+    try {
+      logDir.setWritable(false);
+      log.open();
+      fail("Did no throw exception on only having a bad dir");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "no journals successfully started", ioe);
+    } finally {
+      logDir.setWritable(true);
+      log.close();
+    }
+  }
 }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java Fri Jul 29 16:28:45 2011
@@ -26,40 +26,64 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestEditLogFileOutputStream {
-
+  
+  private final static int HEADER_LEN = 17;
+  private static final File TEST_EDITS =
+    new File(System.getProperty("test.build.data","/tmp"),
+             "editLogStream.dat");
+
+  @Before
+  public void deleteEditsFile() {
+    TEST_EDITS.delete();
+  }
+  
   @Test
   public void testPreallocation() throws IOException {
     Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
         .build();
 
-    File editLog = cluster.getNameNode().getFSImage().getEditLog()
-        .getFsEditName();
-
-    assertEquals("Edit log should only be 4 bytes long",
-        4, editLog.length());
-    assertEquals("Edit log disk space used should be one block",
-        4096, new DU(editLog, conf).getUsed());
+    StorageDirectory sd = cluster.getNameNode().getFSImage()
+      .getStorage().getStorageDir(0);
+    File editLog = NNStorage.getInProgressEditsFile(sd, 1);
+
+    EditLogValidation validation = FSEditLogLoader.validateEditLog(editLog);
+    assertEquals("Edit log should contain a header as valid length",
+        HEADER_LEN, validation.validLength);
+    assertEquals(1, validation.numTransactions);
+    assertEquals("Edit log should have 1MB of bytes allocated",
+        1024*1024, editLog.length());
+    
 
     cluster.getFileSystem().mkdirs(new Path("/tmp"),
         new FsPermission((short)777));
 
-    assertEquals("Edit log should be 1MB + 4 bytes long",
-        (1024 * 1024) + 4, editLog.length());
-    // 256 blocks for the 1MB of preallocation space, 1 block for the original
-    // 4 bytes
+    long oldLength = validation.validLength;
+    validation = FSEditLogLoader.validateEditLog(editLog);
+    assertTrue("Edit log should have more valid data after writing a txn " +
+        "(was: " + oldLength + " now: " + validation.validLength + ")",
+        validation.validLength > oldLength);
+    assertEquals(2, validation.numTransactions);
+
+    assertEquals("Edit log should be 1MB long",
+        1024 * 1024, editLog.length());
+    // 256 blocks for the 1MB of preallocation space
     assertTrue("Edit log disk space used should be at least 257 blocks",
-        257 * 4096 <= new DU(editLog, conf).getUsed());
+        256 * 4096 <= new DU(editLog, conf).getUsed());
   }
   
   @Test
@@ -86,4 +110,48 @@ public class TestEditLogFileOutputStream
     assertEquals("fc was not nulled when elos.close() failed", elos.getFileChannelForTesting(), null);
   }
 
+  /**
+   * Tests EditLogFileOutputStream doesn't throw NullPointerException on
+   * close/abort sequence. See HDFS-2011.
+   */
+  @Test
+  public void testEditLogFileOutputStreamCloseAbort() throws IOException {
+    // abort after a close should just ignore
+    EditLogFileOutputStream editLogStream =
+      new EditLogFileOutputStream(TEST_EDITS, 0);
+    editLogStream.close();
+    editLogStream.abort();
+  }
+
+  /**
+   * Tests EditLogFileOutputStream doesn't throw NullPointerException on
+   * close/close sequence. See HDFS-2011.
+   */
+  @Test
+  public void testEditLogFileOutputStreamCloseClose() throws IOException {
+    // close after a close should result in an IOE
+    EditLogFileOutputStream editLogStream =
+      new EditLogFileOutputStream(TEST_EDITS, 0);
+    editLogStream.close();
+    try {
+      editLogStream.close();
+    } catch (IOException ioe) {
+      String msg = StringUtils.stringifyException(ioe);
+      assertTrue(msg, msg.contains("Trying to use aborted output stream"));
+    }
+  }
+  
+  /**
+   * Tests EditLogFileOutputStream doesn't throw NullPointerException on being
+   * abort/abort sequence. See HDFS-2011.
+   */
+  @Test
+  public void testEditLogFileOutputStreamAbortAbort() throws IOException {
+    // abort after a close should just ignore
+    EditLogFileOutputStream editLogStream =
+      new EditLogFileOutputStream(TEST_EDITS, 0);
+    editLogStream.abort();
+    editLogStream.abort();
+  }
+
 }

Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEditLogJournalFailures {
+
+  private int editsPerformed = 0;
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private Runtime runtime;
+
+  /**
+   * Create the mini cluster for testing and sub in a custom runtime so that
+   * edit log journal failures don't actually cause the JVM to exit.
+   */
+  @Before
+  public void setUpMiniCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    
+    runtime = Runtime.getRuntime();
+    runtime = spy(runtime);
+    doNothing().when(runtime).exit(anyInt());
+    
+    cluster.getNameNode().getFSImage().getEditLog().setRuntimeForTesting(runtime);
+  }
+   
+  @After
+  public void shutDownMiniCluster() throws IOException {
+    fs.close();
+    cluster.shutdown();
+  }
+   
+  @Test
+  public void testSingleFailedEditsDirOnFlush() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate one edits journal.
+    invalidateEditsDirAtIndex(0, true);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // A single journal failure should not result in a call to runtime.exit(...).
+    assertExitInvocations(0);
+    assertFalse(cluster.getNameNode().isInSafeMode());
+  }
+   
+  @Test
+  public void testAllEditsDirsFailOnFlush() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate both edits journals.
+    invalidateEditsDirAtIndex(0, true);
+    invalidateEditsDirAtIndex(1, true);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // The previous edit could not be synced to any persistent storage, should
+    // have halted the NN.
+    assertExitInvocations(1);
+  }
+  
+  @Test
+  public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate one edits journal.
+    invalidateEditsDirAtIndex(0, false);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // A single journal failure should not result in a call to runtime.exit(...).
+    assertExitInvocations(0);
+    assertFalse(cluster.getNameNode().isInSafeMode());
+  }
+
+  /**
+   * Replace the journal at index <code>index</code> with one that throws an
+   * exception on flush.
+   * 
+   * @param index the index of the journal to take offline.
+   * @return the original <code>EditLogOutputStream</code> of the journal.
+   */
+  private EditLogOutputStream invalidateEditsDirAtIndex(int index,
+      boolean failOnFlush) throws IOException {
+    FSImage fsimage = cluster.getNamesystem().getFSImage();
+    FSEditLog editLog = fsimage.getEditLog();
+    
+
+    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    EditLogFileOutputStream elos =
+      (EditLogFileOutputStream) jas.getCurrentStream();
+    EditLogFileOutputStream spyElos = spy(elos);
+    
+    if (failOnFlush) {
+      doThrow(new IOException("fail on flush()")).when(spyElos).flush();
+    } else {
+      doThrow(new IOException("fail on setReadyToFlush()")).when(spyElos)
+        .setReadyToFlush();
+    }
+    doNothing().when(spyElos).abort();
+     
+    jas.setCurrentStreamForTests(spyElos);
+     
+    return elos;
+  }
+
+  /**
+   * Restore the journal at index <code>index</code> with the passed
+   * {@link EditLogOutputStream}.
+   * 
+   * @param index index of the journal to restore.
+   * @param elos the {@link EditLogOutputStream} to put at that index.
+   */
+  private void restoreEditsDirAtIndex(int index, EditLogOutputStream elos) {
+    FSImage fsimage = cluster.getNamesystem().getFSImage();
+    FSEditLog editLog = fsimage.getEditLog();
+
+    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    jas.setCurrentStreamForTests(elos);
+  }
+
+  /**
+   * Do a mutative metadata operation on the file system.
+   * 
+   * @return true if the operation was successful, false otherwise.
+   */
+  private boolean doAnEdit() throws IOException {
+    return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+  }
+
+  /**
+   * Make sure that Runtime.exit(...) has been called
+   * <code>expectedExits<code> number of times.
+   * 
+   * @param expectedExits the number of times Runtime.exit(...) should have been called.
+   */
+  private void assertExitInvocations(int expectedExits) {
+    verify(runtime, times(expectedExits)).exit(anyInt());
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java Fri Jul 29 16:28:45 2011
@@ -18,11 +18,12 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.commons.logging.Log;
+
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 
 import java.io.*;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
@@ -40,8 +41,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 
 import static org.junit.Assert.*;
 import org.junit.Test;
@@ -55,6 +55,10 @@ import static org.mockito.Mockito.*;
  * and namespace saving.
  */
 public class TestEditLogRace {
+  static {
+    ((Log4JLogger)FSEditLog.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
 
   private static final String NAME_DIR =
@@ -181,27 +185,29 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      // set small size of flush buffer
-      editLog.setBufferCapacity(2048);
-      editLog.close();
-      editLog.open();
+      StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
 
       startTransactionWorkers(namesystem, caughtErr);
 
+      long previousLogTxId = 1;
+
       for (int i = 0; i < NUM_ROLLS && caughtErr.get() == null; i++) {
         try {
           Thread.sleep(20);
         } catch (InterruptedException e) {}
 
         LOG.info("Starting roll " + i + ".");
-        editLog.rollEditLog();
-        LOG.info("Roll complete " + i + ".");
-
-        verifyEditLogs(namesystem, fsimage);
-
-        LOG.info("Starting purge " + i + ".");
-        editLog.purgeEditLog();
-        LOG.info("Complete purge " + i + ".");
+        CheckpointSignature sig = namesystem.rollEditLog();
+        
+        long nextLog = sig.curSegmentTxId;
+        String logFileName = NNStorage.getFinalizedEditsFileName(
+            previousLogTxId, nextLog - 1);
+        previousLogTxId += verifyEditLogs(namesystem, fsimage, logFileName, previousLogTxId);
+
+        assertEquals(previousLogTxId, nextLog);
+        
+        File expectedLog = NNStorage.getInProgressEditsFile(sd, previousLogTxId);
+        assertTrue("Expect " + expectedLog + " to exist", expectedLog.exists());
       }
     } finally {
       stopTransactionWorkers();
@@ -214,19 +220,32 @@ public class TestEditLogRace {
     }
   }
 
-  private void verifyEditLogs(FSNamesystem namesystem, FSImage fsimage)
+  private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage, 
+                              String logFileName, long startTxId)
     throws IOException {
+    
+    long numEdits = -1;
+    
     // Verify that we can read in all the transactions that we have written.
     // If there were any corruptions, it is likely that the reading in
     // of these transactions will throw an exception.
-    for (Iterator<StorageDirectory> it = 
-           fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
+    for (StorageDirectory sd :
+      fsimage.getStorage().dirIterable(NameNodeDirType.EDITS)) {
+
+      File editFile = new File(sd.getCurrentDir(), logFileName);
+        
       System.out.println("Verifying file: " + editFile);
-      int numEdits = new FSEditLogLoader(namesystem).loadFSEdits(
-        new EditLogFileInputStream(editFile));
-      System.out.println("Number of edits: " + numEdits);
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+      int numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile), 
+          startTxId);
+      
+      System.out.println("Number of edits: " + numEditsThisLog);
+      assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
+      numEdits = numEditsThisLog;
     }
+
+    assertTrue(numEdits != -1);
+    return numEdits;
   }
 
   /**
@@ -249,11 +268,6 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      // set small size of flush buffer
-      editLog.setBufferCapacity(2048);
-      editLog.close();
-      editLog.open();
-
       startTransactionWorkers(namesystem, caughtErr);
 
       for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
@@ -266,14 +280,28 @@ public class TestEditLogRace {
         namesystem.enterSafeMode(false);
 
         // Verify edit logs before the save
-        verifyEditLogs(namesystem, fsimage);
+        // They should start with the first edit after the checkpoint
+        long logStartTxId = fsimage.getStorage().getMostRecentCheckpointTxId() + 1; 
+        verifyEditLogs(namesystem, fsimage,
+            NNStorage.getInProgressEditsFileName(logStartTxId),
+            logStartTxId);
+
 
         LOG.info("Save " + i + ": saving namespace");
         namesystem.saveNamespace();
         LOG.info("Save " + i + ": leaving safemode");
 
-        // Verify that edit logs post save are also not corrupt
-        verifyEditLogs(namesystem, fsimage);
+        long savedImageTxId = fsimage.getStorage().getMostRecentCheckpointTxId();
+        
+        // Verify that edit logs post save got finalized and aren't corrupt
+        verifyEditLogs(namesystem, fsimage,
+            NNStorage.getFinalizedEditsFileName(logStartTxId, savedImageTxId),
+            logStartTxId);
+        
+        // The checkpoint id should be 1 less than the last written ID, since
+        // the log roll writes the "BEGIN" transaction to the new log.
+        assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(),
+                     editLog.getLastWrittenTxId() - 1);
 
         namesystem.leaveSafeMode(false);
         LOG.info("Save " + i + ": complete");
@@ -328,9 +356,10 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      ArrayList<EditLogOutputStream> streams = editLog.getEditStreams();
-      EditLogOutputStream spyElos = spy(streams.get(0));
-      streams.set(0, spyElos);
+      FSEditLog.JournalAndStream jas = editLog.getJournals().get(0);
+      EditLogFileOutputStream spyElos =
+          spy((EditLogFileOutputStream)jas.getCurrentStream());
+      jas.setCurrentStreamForTests(spyElos);
 
       final AtomicReference<Throwable> deferredException =
           new AtomicReference<Throwable>();
@@ -393,7 +422,14 @@ public class TestEditLogRace {
       doAnEditThread.join();
       assertNull(deferredException.get());
 
-      verifyEditLogs(namesystem, fsimage);
+      // We did 3 edits: begin, txn, and end
+      assertEquals(3, verifyEditLogs(namesystem, fsimage,
+          NNStorage.getFinalizedEditsFileName(1, 3),
+          1));
+      // after the save, just the one "begin"
+      assertEquals(1, verifyEditLogs(namesystem, fsimage,
+          NNStorage.getInProgressEditsFileName(4),
+          4));
     } finally {
       LOG.info("Closing namesystem");
       if(namesystem != null) namesystem.close();
@@ -478,7 +514,14 @@ public class TestEditLogRace {
       doAnEditThread.join();
       assertNull(deferredException.get());
 
-      verifyEditLogs(namesystem, fsimage);
+      // We did 3 edits: begin, txn, and end
+      assertEquals(3, verifyEditLogs(namesystem, fsimage,
+          NNStorage.getFinalizedEditsFileName(1, 3),
+          1));
+      // after the save, just the one "begin"
+      assertEquals(1, verifyEditLogs(namesystem, fsimage,
+          NNStorage.getInProgressEditsFileName(4),
+          4));
     } finally {
       LOG.info("Closing namesystem");
       if(namesystem != null) namesystem.close();

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Fri Jul 29 16:28:45 2011
@@ -20,9 +20,14 @@ package org.apache.hadoop.hdfs.server.na
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.SortedMap;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,10 +36,25 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
 public class TestFSEditLogLoader {
   
+  static {
+    ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
+  private static final File TEST_DIR = new File(
+      System.getProperty("test.build.data","build/test/data"));
+
   private static final int NUM_DATA_NODES = 0;
   
   @Test
@@ -50,14 +70,15 @@ public class TestFSEditLogLoader {
     final FSNamesystem namesystem = cluster.getNamesystem();
 
     FSImage fsimage = namesystem.getFSImage();
-    final FSEditLog editLog = fsimage.getEditLog();
     for (int i = 0; i < 20; i++) {
       fileSys.mkdirs(new Path("/tmp/tmp" + i));
     }
-    File editFile = editLog.getFsEditName();
-    editLog.close();
+    StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
     cluster.shutdown();
-    
+
+    File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile();
+    assertTrue("Should exist: " + editFile, editFile.exists());
+
     // Corrupt the edits file.
     long fileLen = editFile.length();
     RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
@@ -124,4 +145,168 @@ public class TestFSEditLogLoader {
       }
     }
   }
+  
+  /**
+   * Test that the valid number of transactions can be counted from a file.
+   * @throws IOException 
+   */
+  @Test
+  public void testCountValidTransactions() throws IOException {
+    File testDir = new File(TEST_DIR, "testCountValidTransactions");
+    File logFile = new File(testDir,
+        NNStorage.getInProgressEditsFileName(1));
+    
+    // Create a log file, and return the offsets at which each
+    // transaction starts.
+    FSEditLog fsel = null;
+    final int NUM_TXNS = 30;
+    SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+    try {
+      fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
+      fsel.open();
+      assertTrue("should exist: " + logFile, logFile.exists());
+      
+      for (int i = 0; i < NUM_TXNS; i++) {
+        long trueOffset = getNonTrailerLength(logFile);
+        long thisTxId = fsel.getLastWrittenTxId() + 1;
+        offsetToTxId.put(trueOffset, thisTxId);
+        System.err.println("txid " + thisTxId + " at offset " + trueOffset);
+        fsel.logDelete("path" + i, i);
+        fsel.logSync();
+      }
+    } finally {
+      if (fsel != null) {
+        fsel.close();
+      }
+    }
+
+    // The file got renamed when the log was closed.
+    logFile = testDir.listFiles()[0];
+    long validLength = getNonTrailerLength(logFile);
+
+    // Make sure that uncorrupted log has the expected length and number
+    // of transactions.
+    EditLogValidation validation = FSEditLogLoader.validateEditLog(logFile);
+    assertEquals(NUM_TXNS + 2, validation.numTransactions);
+    assertEquals(validLength, validation.validLength);
+    
+    // Back up the uncorrupted log
+    File logFileBak = new File(testDir, logFile.getName() + ".bak");
+    Files.copy(logFile, logFileBak);
+
+    // Corrupt the log file in various ways for each txn
+    for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+      long txOffset = entry.getKey();
+      long txid = entry.getValue();
+      
+      // Restore backup, truncate the file exactly before the txn
+      Files.copy(logFileBak, logFile);
+      truncateFile(logFile, txOffset);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when truncating to length " + txOffset,
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+
+      // Restore backup, truncate the file with one byte in the txn,
+      // also isn't valid
+      Files.copy(logFileBak, logFile);
+      truncateFile(logFile, txOffset + 1);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when truncating to length " + (txOffset + 1),
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+
+      // Restore backup, corrupt the txn opcode
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, txOffset);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when corrupting txn opcode at " + txOffset,
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+
+      // Restore backup, corrupt a byte a few bytes into the txn
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, txOffset+5);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when corrupting txn data at " + (txOffset+5),
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+    }
+    
+    // Corrupt the log at every offset to make sure that validation itself
+    // never throws an exception, and that the calculated lengths are monotonically
+    // increasing
+    long prevNumValid = 0;
+    for (long offset = 0; offset < validLength; offset++) {
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, offset);
+      EditLogValidation val = FSEditLogLoader.validateEditLog(logFile);
+      assertTrue(val.numTransactions >= prevNumValid);
+      prevNumValid = val.numTransactions;
+    }
+  }
+
+  /**
+   * Corrupt the byte at the given offset in the given file,
+   * by subtracting 1 from it.
+   */
+  private void corruptByteInFile(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = new RandomAccessFile(file, "rw");
+    try {
+      raf.seek(offset);
+      int origByte = raf.read();
+      raf.seek(offset);
+      raf.writeByte(origByte - 1);
+    } finally {
+      IOUtils.closeStream(raf);
+    }
+  }
+
+  /**
+   * Truncate the given file to the given length
+   */
+  private void truncateFile(File logFile, long newLength)
+      throws IOException {
+    RandomAccessFile raf = new RandomAccessFile(logFile, "rw");
+    raf.setLength(newLength);
+    raf.close();
+  }
+
+  /**
+   * Return the length of bytes in the given file after subtracting
+   * the trailer of 0xFF (OP_INVALID)s.
+   * This seeks to the end of the file and reads chunks backwards until
+   * it finds a non-0xFF byte.
+   * @throws IOException if the file cannot be read
+   */
+  private static long getNonTrailerLength(File f) throws IOException {
+    final int chunkSizeToRead = 256*1024;
+    FileInputStream fis = new FileInputStream(f);
+    try {
+      
+      byte buf[] = new byte[chunkSizeToRead];
+  
+      FileChannel fc = fis.getChannel();
+      long size = fc.size();
+      long pos = size - (size % chunkSizeToRead);
+      
+      while (pos >= 0) {
+        fc.position(pos);
+  
+        int readLen = (int) Math.min(size - pos, chunkSizeToRead);
+        IOUtils.readFully(fis, buf, 0, readLen);
+        for (int i = readLen - 1; i >= 0; i--) {
+          if (buf[i] != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+            return pos + i + 1; // + 1 since we count this byte!
+          }
+        }
+        
+        pos -= chunkSizeToRead;
+      }
+      return 0;
+    } finally {
+      fis.close();
+    }
+  }
 }

Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
+
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup;
+import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFSImageStorageInspector {
+  private static final Log LOG = LogFactory.getLog(
+      TestFSImageStorageInspector.class);
+
+  /**
+   * Simple test with image, edits, and inprogress edits
+   */
+  @Test
+  public void testCurrentStorageInspector() throws IOException {
+    FSImageTransactionalStorageInspector inspector = 
+        new FSImageTransactionalStorageInspector();
+    
+    StorageDirectory mockDir = mockDirectory(
+        NameNodeDirType.IMAGE_AND_EDITS,
+        false,
+        "/foo/current/" + getImageFileName(123),
+        "/foo/current/" + getFinalizedEditsFileName(123, 456),
+        "/foo/current/" + getImageFileName(456),
+        "/foo/current/" + getInProgressEditsFileName(457));
+
+    inspector.inspectDirectory(mockDir);
+    mockLogValidation(inspector,
+        "/foo/current/" + getInProgressEditsFileName(457), 10);
+    
+    assertEquals(2, inspector.foundEditLogs.size());
+    assertEquals(2, inspector.foundImages.size());
+    assertTrue(inspector.foundEditLogs.get(1).isInProgress());
+    
+    FoundFSImage latestImage = inspector.getLatestImage();
+    assertEquals(456, latestImage.txId);
+    assertSame(mockDir, latestImage.sd);
+    assertTrue(inspector.isUpgradeFinalized());
+    
+    LoadPlan plan = inspector.createLoadPlan();
+    LOG.info("Plan: " + plan);
+    
+    assertEquals(new File("/foo/current/"+getImageFileName(456)), 
+                 plan.getImageFile());
+    assertArrayEquals(new File[] {
+        new File("/foo/current/" + getInProgressEditsFileName(457)) },
+        plan.getEditsFiles().toArray(new File[0]));
+  }
+  
+  /**
+   * Test that we check for gaps in txids when devising a load plan.
+   */
+  @Test
+  public void testPlanWithGaps() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    
+    StorageDirectory mockDir = mockDirectory(
+        NameNodeDirType.IMAGE_AND_EDITS,
+        false,
+        "/foo/current/" + getImageFileName(123),
+        "/foo/current/" + getImageFileName(456),
+        "/foo/current/" + getFinalizedEditsFileName(457,900),
+        "/foo/current/" + getFinalizedEditsFileName(901,950),
+        "/foo/current/" + getFinalizedEditsFileName(952,1000)); // <-- missing edit 951!
+
+    inspector.inspectDirectory(mockDir);
+    try {
+      inspector.createLoadPlan();
+      fail("Didn't throw IOE trying to load with gaps in edits");
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().contains(
+          "would start at txid 951 but starts at txid 952"));
+    }
+  }
+  
+  /**
+   * Test the case where an in-progress log comes in the middle of a sequence
+   * of logs
+   */
+  @Test
+  public void testPlanWithInProgressInMiddle() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    
+    StorageDirectory mockDir = mockDirectory(
+        NameNodeDirType.IMAGE_AND_EDITS,
+        false,
+        "/foo/current/" + getImageFileName(123),
+        "/foo/current/" + getImageFileName(456),
+        "/foo/current/" + getFinalizedEditsFileName(457,900),
+        "/foo/current/" + getInProgressEditsFileName(901), // <-- inprogress in middle
+        "/foo/current/" + getFinalizedEditsFileName(952,1000));
+
+    inspector.inspectDirectory(mockDir);
+    mockLogValidation(inspector,
+        "/foo/current/" + getInProgressEditsFileName(901), 51);
+
+    LoadPlan plan = inspector.createLoadPlan();
+    LOG.info("Plan: " + plan);
+    
+    assertEquals(new File("/foo/current/" + getImageFileName(456)), 
+                 plan.getImageFile());
+    assertArrayEquals(new File[] {
+        new File("/foo/current/" + getFinalizedEditsFileName(457,900)),
+        new File("/foo/current/" + getInProgressEditsFileName(901)),
+        new File("/foo/current/" + getFinalizedEditsFileName(952,1000)) },
+        plan.getEditsFiles().toArray(new File[0]));
+
+  }
+
+  
+  /**
+   * Test case for the usual case where no recovery of a log group is necessary
+   * (i.e all logs have the same start and end txids and finalized)
+   */
+  @Test
+  public void testLogGroupRecoveryNoop() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo1/current/" 
+                                  + getFinalizedEditsFileName(123,456)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo2/current/"
+                                  + getFinalizedEditsFileName(123,456)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo3/current/"
+                                  + getFinalizedEditsFileName(123,456)));
+    LogGroup lg = inspector.logGroups.get(123L);
+    assertEquals(3, lg.logs.size());
+    
+    lg.planRecovery();
+    
+    assertFalse(lg.logs.get(0).isCorrupt());
+    assertFalse(lg.logs.get(1).isCorrupt());
+    assertFalse(lg.logs.get(2).isCorrupt());
+  }
+  
+  /**
+   * Test case where we have some in-progress and some finalized logs
+   * for a given txid.
+   */
+  @Test
+  public void testLogGroupRecoveryMixed() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo1/current/"
+                                  + getFinalizedEditsFileName(123,456)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo2/current/"
+                                  + getFinalizedEditsFileName(123,456)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo3/current/"
+                                  + getInProgressEditsFileName(123)));
+    inspector.inspectDirectory(mockDirectory(
+        NameNodeDirType.IMAGE,
+        false,
+        "/foo4/current/" + getImageFileName(122)));
+
+    LogGroup lg = inspector.logGroups.get(123L);
+    assertEquals(3, lg.logs.size());
+    FoundEditLog inProgressLog = lg.logs.get(2);
+    assertTrue(inProgressLog.isInProgress());
+    
+    LoadPlan plan = inspector.createLoadPlan();
+
+    // Check that it was marked corrupt.
+    assertFalse(lg.logs.get(0).isCorrupt());
+    assertFalse(lg.logs.get(1).isCorrupt());
+    assertTrue(lg.logs.get(2).isCorrupt());
+
+    
+    // Calling recover should move it aside
+    inProgressLog = spy(inProgressLog);
+    Mockito.doNothing().when(inProgressLog).moveAsideCorruptFile();
+    lg.logs.set(2, inProgressLog);
+    
+    plan.doRecovery();
+    
+    Mockito.verify(inProgressLog).moveAsideCorruptFile();
+  }
+  
+  /**
+   * Test case where we have finalized logs with different end txids
+   */
+  @Test
+  public void testLogGroupRecoveryInconsistentEndTxIds() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo1/current/"
+                                  + getFinalizedEditsFileName(123,456)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo2/current/"
+                                  + getFinalizedEditsFileName(123,678)));
+
+    LogGroup lg = inspector.logGroups.get(123L);
+    assertEquals(2, lg.logs.size());
+
+    try {
+      lg.planRecovery();
+      fail("Didn't throw IOE on inconsistent end txids");
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().contains("More than one ending txid"));
+    }
+  }
+
+  /**
+   * Test case where we have only in-progress logs and need to synchronize
+   * based on valid length.
+   */
+  @Test
+  public void testLogGroupRecoveryInProgress() throws IOException {
+    String paths[] = new String[] {
+        "/foo1/current/" + getInProgressEditsFileName(123),
+        "/foo2/current/" + getInProgressEditsFileName(123),
+        "/foo3/current/" + getInProgressEditsFileName(123)
+    };
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[0]));
+    inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[1]));
+    inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[2]));
+
+    // Inject spies to return the valid counts we would like to see
+    mockLogValidation(inspector, paths[0], 2000);
+    mockLogValidation(inspector, paths[1], 2000);
+    mockLogValidation(inspector, paths[2], 1000);
+
+    LogGroup lg = inspector.logGroups.get(123L);
+    assertEquals(3, lg.logs.size());
+    
+    lg.planRecovery();
+    
+    // Check that the short one was marked corrupt
+    assertFalse(lg.logs.get(0).isCorrupt());
+    assertFalse(lg.logs.get(1).isCorrupt());
+    assertTrue(lg.logs.get(2).isCorrupt());
+    
+    // Calling recover should move it aside
+    FoundEditLog badLog = lg.logs.get(2);
+    Mockito.doNothing().when(badLog).moveAsideCorruptFile();
+    Mockito.doNothing().when(lg.logs.get(0)).finalizeLog();
+    Mockito.doNothing().when(lg.logs.get(1)).finalizeLog();
+    
+    lg.recover();
+    
+    Mockito.verify(badLog).moveAsideCorruptFile();
+    Mockito.verify(lg.logs.get(0)).finalizeLog();
+    Mockito.verify(lg.logs.get(1)).finalizeLog();
+  }
+
+  /**
+   * Mock out the log at the given path to return a specified number
+   * of transactions upon validation.
+   */
+  private void mockLogValidation(
+      FSImageTransactionalStorageInspector inspector,
+      String path, int numValidTransactions) throws IOException {
+    
+    for (LogGroup lg : inspector.logGroups.values()) {
+      List<FoundEditLog> logs = lg.logs;
+      for (int i = 0; i < logs.size(); i++) {
+        FoundEditLog log = logs.get(i);
+        if (log.file.getPath().equals(path)) {
+          // mock out its validation
+          FoundEditLog spyLog = spy(log);
+          doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions))
+            .when(spyLog).validateLog();
+          logs.set(i, spyLog);
+          return;
+        }
+      }
+    }
+    fail("No log found to mock out at " + path);
+  }
+
+  /**
+   * Test when edits and image are in separate directories.
+   */
+  @Test
+  public void testCurrentSplitEditsAndImage() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    
+    StorageDirectory mockImageDir = mockDirectory(
+        NameNodeDirType.IMAGE,
+        false,
+        "/foo/current/" + getImageFileName(123));
+    StorageDirectory mockImageDir2 = mockDirectory(
+        NameNodeDirType.IMAGE,
+        false,
+        "/foo2/current/" + getImageFileName(456));
+    StorageDirectory mockEditsDir = mockDirectory(
+        NameNodeDirType.EDITS,
+        false,
+        "/foo3/current/" + getFinalizedEditsFileName(123, 456),
+        "/foo3/current/" + getInProgressEditsFileName(457));
+    
+    inspector.inspectDirectory(mockImageDir);
+    inspector.inspectDirectory(mockEditsDir);
+    inspector.inspectDirectory(mockImageDir2);
+    
+    mockLogValidation(inspector,
+        "/foo3/current/" + getInProgressEditsFileName(457), 2);
+
+    assertEquals(2, inspector.foundEditLogs.size());
+    assertEquals(2, inspector.foundImages.size());
+    assertTrue(inspector.foundEditLogs.get(1).isInProgress());
+    assertTrue(inspector.isUpgradeFinalized());    
+
+    // Check plan
+    TransactionalLoadPlan plan =
+      (TransactionalLoadPlan)inspector.createLoadPlan();
+    FoundFSImage pickedImage = plan.image;
+    assertEquals(456, pickedImage.txId);
+    assertSame(mockImageDir2, pickedImage.sd);
+    assertEquals(new File("/foo2/current/" + getImageFileName(456)),
+                 plan.getImageFile());
+    assertArrayEquals(new File[] {
+        new File("/foo3/current/" + getInProgressEditsFileName(457))
+      }, plan.getEditsFiles().toArray(new File[0]));
+
+    // Check log manifest
+    assertEquals("[[123,456]]", inspector.getEditLogManifest(123).toString());
+    assertEquals("[[123,456]]", inspector.getEditLogManifest(456).toString());
+    assertEquals("[]", inspector.getEditLogManifest(457).toString());
+  }
+  
+  @Test
+  public void testLogManifest() throws IOException { 
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo1/current/" 
+                                  + getFinalizedEditsFileName(1,1),
+                                  "/foo1/current/"
+                                  + getFinalizedEditsFileName(2,200)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo2/current/" 
+                                  + getInProgressEditsFileName(1),
+                                  "/foo2/current/"
+                                  + getFinalizedEditsFileName(201, 400)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo3/current/"
+                                  + getFinalizedEditsFileName(1, 1),
+                                  "/foo3/current/"
+                                  + getFinalizedEditsFileName(2,200)));
+    
+    assertEquals("[[1,1], [2,200], [201,400]]",
+                 inspector.getEditLogManifest(1).toString());
+    assertEquals("[[2,200], [201,400]]",
+                 inspector.getEditLogManifest(2).toString());
+    assertEquals("[[2,200], [201,400]]",
+                 inspector.getEditLogManifest(10).toString());
+    assertEquals("[[201,400]]",
+                 inspector.getEditLogManifest(201).toString());
+  }  
+
+  /**
+   * Test case where an in-progress log is in an earlier name directory
+   * than a finalized log. Previously, getEditLogManifest wouldn't
+   * see this log.
+   */
+  @Test
+  public void testLogManifestInProgressComesFirst() throws IOException { 
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo1/current/" 
+                                  + getFinalizedEditsFileName(2622,2623),
+                                  "/foo1/current/"
+                                  + getFinalizedEditsFileName(2624,2625),
+                                  "/foo1/current/"
+                                  + getInProgressEditsFileName(2626)));
+    inspector.inspectDirectory(
+        mockDirectoryWithEditLogs("/foo2/current/"
+                                  + getFinalizedEditsFileName(2622,2623),
+                                  "/foo2/current/"
+                                  + getFinalizedEditsFileName(2624,2625),
+                                  "/foo2/current/"
+                                  + getFinalizedEditsFileName(2626,2627),
+                                  "/foo2/current/"
+                                  + getFinalizedEditsFileName(2628,2629)));
+    
+    assertEquals("[[2622,2623], [2624,2625], [2626,2627], [2628,2629]]",
+                 inspector.getEditLogManifest(2621).toString());
+  }  
+  
+  private StorageDirectory mockDirectoryWithEditLogs(String... fileNames) {
+    return mockDirectory(NameNodeDirType.EDITS, false, fileNames);
+  }
+  
+  /**
+   * Make a mock storage directory that returns some set of file contents.
+   * @param type type of storage dir
+   * @param previousExists should we mock that the previous/ dir exists?
+   * @param fileNames the names of files contained in current/
+   */
+  static StorageDirectory mockDirectory(
+      StorageDirType type,
+      boolean previousExists,
+      String...  fileNames) {
+    StorageDirectory sd = mock(StorageDirectory.class);
+    
+    doReturn(type).when(sd).getStorageDirType();
+
+    // Version file should always exist
+    doReturn(FSImageTestUtil.mockFile(true)).when(sd).getVersionFile();
+    
+    // Previous dir optionally exists
+    doReturn(FSImageTestUtil.mockFile(previousExists))
+      .when(sd).getPreviousDir();   
+
+    // Return a mock 'current' directory which has the given paths
+    File[] files = new File[fileNames.length];
+    for (int i = 0; i < fileNames.length; i++) {
+      files[i] = new File(fileNames[i]);
+    }
+    
+    File mockDir = Mockito.spy(new File("/dir/current"));
+    doReturn(files).when(mockDir).listFiles();
+    doReturn(mockDir).when(sd).getCurrentDir();
+    
+    return sd;
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java Fri Jul 29 16:28:45 2011
@@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyOb
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -59,7 +62,7 @@ public class TestFsLimits {
   
   private static class TestFSDirectory extends FSDirectory {
     public TestFSDirectory() throws IOException {
-      super(new FSImage(), getMockNamesystem(), conf);
+      super(new FSImage(conf), getMockNamesystem(), conf);
       setReady(fsIsReady);
     }
     
@@ -71,8 +74,12 @@ public class TestFsLimits {
   }
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+             fileAsURI(new File(MiniDFSCluster.getBaseDirectory(),
+                                "namenode")).toString());
+
     rootInode = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME, perms, 0L, 0L);
     inodes = new INode[]{ rootInode, null };
     fs = null;

Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+import static org.apache.hadoop.test.GenericTestUtils.assertGlobEquals;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
+import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
+
+
+/**
+ * Functional tests for NNStorageRetentionManager. This differs from
+ * {@link TestNNStorageRetentionManager} in that the other test suite
+ * is only unit/mock-based tests whereas this suite starts miniclusters,
+ * etc.
+ */
+public class TestNNStorageRetentionFunctional {
+
+  private static File TEST_ROOT_DIR =
+    new File(MiniDFSCluster.getBaseDirectory());
+  private static Log LOG = LogFactory.getLog(
+      TestNNStorageRetentionFunctional.class);
+
+ /**
+  * Test case where two directories are configured as NAME_AND_EDITS
+  * and one of them fails to save storage. Since the edits and image
+  * failure states are decoupled, the failure of image saving should
+  * not prevent the purging of logs from that dir.
+  */
+  @Test
+  public void testPurgingWithNameEditsDirAfterFailure()
+      throws IOException {
+    MiniDFSCluster cluster = null;    
+    Configuration conf = new HdfsConfiguration();
+
+    File sd0 = new File(TEST_ROOT_DIR, "nn0");
+    File sd1 = new File(TEST_ROOT_DIR, "nn1");
+    File cd0 = new File(sd0, "current");
+    File cd1 = new File(sd1, "current");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        Joiner.on(",").join(sd0, sd1));
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .manageNameDfsDirs(false)
+        .format(true).build();
+  
+      NameNode nn = cluster.getNameNode();
+
+      doSaveNamespace(nn);
+      LOG.info("After first save, images 0 and 2 should exist in both dirs");
+      assertGlobEquals(cd0, "fsimage_\\d*", 
+          getImageFileName(0), getImageFileName(2));
+      assertGlobEquals(cd1, "fsimage_\\d*",
+          getImageFileName(0), getImageFileName(2));
+      assertGlobEquals(cd0, "edits_.*",
+          getFinalizedEditsFileName(1, 2),
+          getInProgressEditsFileName(3));
+      assertGlobEquals(cd1, "edits_.*",
+          getFinalizedEditsFileName(1, 2),
+          getInProgressEditsFileName(3));
+      
+      doSaveNamespace(nn);
+      LOG.info("After second save, image 0 should be purged, " +
+          "and image 4 should exist in both.");
+      assertGlobEquals(cd0, "fsimage_\\d*",
+          getImageFileName(2), getImageFileName(4));
+      assertGlobEquals(cd1, "fsimage_\\d*",
+          getImageFileName(2), getImageFileName(4));
+      assertGlobEquals(cd0, "edits_.*",
+          getFinalizedEditsFileName(3, 4),
+          getInProgressEditsFileName(5));
+      assertGlobEquals(cd1, "edits_.*",
+          getFinalizedEditsFileName(3, 4),
+          getInProgressEditsFileName(5));
+      
+      LOG.info("Failing first storage dir by chmodding it");
+      sd0.setExecutable(false);
+      doSaveNamespace(nn);      
+      LOG.info("Restoring accessibility of first storage dir");      
+      sd0.setExecutable(true);
+
+      LOG.info("nothing should have been purged in first storage dir");
+      assertGlobEquals(cd0, "fsimage_\\d*",
+          getImageFileName(2), getImageFileName(4));
+      assertGlobEquals(cd0, "edits_.*",
+          getFinalizedEditsFileName(3, 4),
+          getInProgressEditsFileName(5));
+
+      LOG.info("fsimage_2 should be purged in second storage dir");
+      assertGlobEquals(cd1, "fsimage_\\d*",
+          getImageFileName(4), getImageFileName(6));
+      assertGlobEquals(cd1, "edits_.*",
+          getFinalizedEditsFileName(5, 6),
+          getInProgressEditsFileName(7));
+
+      LOG.info("On next save, we should purge logs from the failed dir," +
+          " but not images, since the image directory is in failed state.");
+      doSaveNamespace(nn);
+      assertGlobEquals(cd1, "fsimage_\\d*",
+          getImageFileName(6), getImageFileName(8));
+      assertGlobEquals(cd1, "edits_.*",
+          getFinalizedEditsFileName(7, 8),
+          getInProgressEditsFileName(9));
+      assertGlobEquals(cd0, "fsimage_\\d*",
+          getImageFileName(2), getImageFileName(4));
+      assertGlobEquals(cd0, "edits_.*",
+          getInProgressEditsFileName(9));
+    } finally {
+      sd0.setExecutable(true);
+
+      LOG.info("Shutting down...");
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private static void doSaveNamespace(NameNode nn) throws IOException {
+    LOG.info("Saving namespace...");
+    nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    nn.saveNamespace();
+    nn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+  }
+}