You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2008/08/30 01:13:13 UTC

svn commit: r690418 [1/2] - in /hadoop/core/trunk: ./ conf/ src/hdfs/org/apache/hadoop/hdfs/server/common/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/a...

Author: shv
Date: Fri Aug 29 16:13:12 2008
New Revision: 690418

URL: http://svn.apache.org/viewvc?rev=690418&view=rev
Log:
HADOOP-3948. Separate name-node edits and fsimage directories. Contributed by Lohit Vijayarenu.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 29 16:13:12 2008
@@ -111,6 +111,9 @@
     HADOOP-3828. Provides a way to write skipped records to DFS.
     (Sharad Agarwal via ddas)
 
+    HADOOP-3948. Separate name-node edits and fsimage directories.
+    (Lohit Vijayarenu via shv)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
@@ -626,7 +629,7 @@
     (rangadi)
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().
-    (Lohit Vjayarenu via rangadi)
+    (Lohit Vijayarenu via rangadi)
 
     HADOOP-3130. Make the connect timeout smaller for getFile.
     (Amar Ramesh Kamat via ddas)

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Aug 29 16:13:12 2008
@@ -242,13 +242,24 @@
   <name>fs.checkpoint.dir</name>
   <value>${hadoop.tmp.dir}/dfs/namesecondary</value>
   <description>Determines where on the local filesystem the DFS secondary
-      name node should store the temporary images and edits to merge.
+      name node should store the temporary images to merge.
       If this is a comma-delimited list of directories then the image is
       replicated in all of the directories for redundancy.
   </description>
 </property>
 
 <property>
+  <name>fs.checkpoint.edits.dir</name>
+  <value>${fs.checkpoint.dir}</value>
+  <description>Determines where on the local filesystem the DFS secondary
+      name node should store the temporary edits to merge.
+      If this is a comma-delimited list of directoires then teh edits is
+      replicated in all of the directoires for redundancy.
+      Default value is same as fs.checkpoint.dir
+  </description>
+</property>
+
+<property>
   <name>fs.checkpoint.period</name>
   <value>3600</value>
   <description>The number of seconds between two periodic checkpoints.
@@ -380,12 +391,21 @@
   <name>dfs.name.dir</name>
   <value>${hadoop.tmp.dir}/dfs/name</value>
   <description>Determines where on the local filesystem the DFS name node
-      should store the name table.  If this is a comma-delimited list
+      should store the name table(fsimage).  If this is a comma-delimited list
       of directories then the name table is replicated in all of the
       directories, for redundancy. </description>
 </property>
 
 <property>
+  <name>dfs.name.edits.dir</name>
+  <value>${dfs.name.dir}</value>
+  <description>Determines where on the local filesystem the DFS name node
+      should store the transaction (edits) file. If this is a comma-delimited list
+      of directories then the transaction file is replicated in all of the 
+      directories, for redundancy. Default value is same as dfs.name.dir
+  </description>
+</property>
+<property>
   <name>dfs.web.ugi</name>
   <value>webuser,webgroup</value>
   <description>The user account used by the web interface.

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java Fri Aug 29 16:13:12 2008
@@ -97,22 +97,117 @@
     NORMAL;
   }
   
+  /**
+   * An interface to denote storage directory type
+   * Implementations can define a type for storage directory by implementing
+   * this interface.
+   */
+  public interface StorageDirType {
+    public StorageDirType getStorageDirType();
+    public boolean isOfType(StorageDirType type);
+  }
+  
   private NodeType storageType;    // Type of the node using this storage 
   protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
   
+  private class DirIterator implements Iterator<StorageDirectory> {
+    StorageDirType dirType;
+    int prevIndex; // for remove()
+    int nextIndex; // for next()
+    
+    DirIterator(StorageDirType dirType) {
+      this.dirType = dirType;
+      this.nextIndex = 0;
+      this.prevIndex = 0;
+    }
+    
+    public boolean hasNext() {
+      if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
+        return false;
+      if (dirType != null) {
+        while (nextIndex < storageDirs.size()) {
+          if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+            break;
+          nextIndex++;
+        }
+        if (nextIndex >= storageDirs.size())
+         return false;
+      }
+      return true;
+    }
+    
+    public StorageDirectory next() {
+      StorageDirectory sd = getStorageDir(nextIndex);
+      prevIndex = nextIndex;
+      nextIndex++;
+      if (dirType != null) {
+        while (nextIndex < storageDirs.size()) {
+          if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+            break;
+          nextIndex++;
+        }
+      }
+      return sd;
+    }
+    
+    public void remove() {
+      nextIndex = prevIndex; // restore previous state
+      storageDirs.remove(prevIndex); // remove last returned element
+      hasNext(); // reset nextIndex to correct place
+    }
+  }
+  
+  /**
+   * Return default iterator
+   * This iterator returns all entires of storageDirs
+   */
+  public Iterator<StorageDirectory> dirIterator() {
+    return dirIterator(null);
+  }
+  
+  /**
+   * Return iterator based on Storage Directory Type
+   * This iterator selects entires of storageDirs of type dirType and returns
+   * them via the Iterator
+   */
+  public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
+    return new DirIterator(dirType);
+  }
+  
   /**
    * One of the storage directories.
    */
   public class StorageDirectory {
-    public File              root; // root directory
+    File              root; // root directory
     FileLock          lock; // storage lock
+    StorageDirType dirType; // storage dir type
     
     public StorageDirectory(File dir) {
+      // default dirType is null
+      this(dir, null);
+    }
+    
+    public StorageDirectory(File dir, StorageDirType dirType) {
       this.root = dir;
       this.lock = null;
+      this.dirType = dirType;
+    }
+    
+    /**
+     * Get root directory of this storage
+     */
+    public File getRoot() {
+      return root;
     }
 
     /**
+     * Get storage directory type
+     */
+    public StorageDirType getStorageDirType() {
+      return dirType;
+    }
+    
+    /**
      * Read version file.
      * 
      * @throws IOException if file cannot be read or contains inconsistent data

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Fri Aug 29 16:13:12 2008
@@ -179,14 +179,14 @@
     if (ssid == null ||
         !("".equals(storageID) || "".equals(ssid) ||
           storageID.equals(ssid)))
-      throw new InconsistentFSStateException(sd.root,
+      throw new InconsistentFSStateException(sd.getRoot(),
                                              "has incompatible storage Id.");
     if ("".equals(storageID)) // update id only if it was empty
       storageID = ssid;
   }
 
   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
-    File oldF = new File(sd.root, "storage");
+    File oldF = new File(sd.getRoot(), "storage");
     if (!oldF.exists())
       return false;
     // check the layout version inside the storage file
@@ -230,7 +230,7 @@
       "Future version is not allowed";
     if (getNamespaceID() != nsInfo.getNamespaceID())
       throw new IOException(
-                            "Incompatible namespaceIDs in " + sd.root.getCanonicalPath()
+                            "Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
                             + ": namenode namespaceID = " + nsInfo.getNamespaceID() 
                             + "; datanode namespaceID = " + getNamespaceID());
     if (this.layoutVersion == FSConstants.LAYOUT_VERSION 
@@ -262,7 +262,7 @@
   void doUpgrade(StorageDirectory sd,
                  NamespaceInfo nsInfo
                  ) throws IOException {
-    LOG.info("Upgrading storage directory " + sd.root 
+    LOG.info("Upgrading storage directory " + sd.getRoot()
              + ".\n   old LV = " + this.getLayoutVersion()
              + "; old CTime = " + this.getCTime()
              + ".\n   new LV = " + nsInfo.getLayoutVersion()
@@ -287,7 +287,7 @@
     sd.write();
     // rename tmp to previous
     rename(tmpDir, prevDir);
-    LOG.info("Upgrade of " + sd.root + " is complete.");
+    LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
   }
 
   void doRollback( StorageDirectory sd,
@@ -298,19 +298,19 @@
     if (!prevDir.exists())
       return;
     DataStorage prevInfo = new DataStorage();
-    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.root);
+    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
     prevSD.read(prevSD.getPreviousVersionFile());
 
     // We allow rollback to a state, which is either consistent with
     // the namespace state or can be further upgraded to it.
     if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
           && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
-      throw new InconsistentFSStateException(prevSD.root,
+      throw new InconsistentFSStateException(prevSD.getRoot(),
                                              "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
                                              + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
                                              + " is newer than the namespace state: LV = "
                                              + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
-    LOG.info("Rolling back storage directory " + sd.root 
+    LOG.info("Rolling back storage directory " + sd.getRoot()
              + ".\n   target LV = " + nsInfo.getLayoutVersion()
              + "; target CTime = " + nsInfo.getCTime());
     File tmpDir = sd.getRemovedTmp();
@@ -323,14 +323,14 @@
     rename(prevDir, curDir);
     // delete tmp dir
     deleteDir(tmpDir);
-    LOG.info("Rollback of " + sd.root + " is complete.");
+    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
   }
 
   void doFinalize(StorageDirectory sd) throws IOException {
     File prevDir = sd.getPreviousDir();
     if (!prevDir.exists())
       return; // already discarded
-    final String dataDirPath = sd.root.getCanonicalPath();
+    final String dataDirPath = sd.getRoot().getCanonicalPath();
     LOG.info("Finalizing upgrade for storage directory " 
              + dataDirPath 
              + ".\n   cur LV = " + this.getLayoutVersion()

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Aug 29 16:13:12 2008
@@ -54,7 +54,8 @@
   /** Access an existing dfs name directory. */
   public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this(new FSImage(), ns, conf);
-    fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null));
+    fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
+                                FSImage.getCheckpointEditsDirs(conf, null));
   }
 
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
@@ -73,22 +74,23 @@
   }
 
   void loadFSImage(Collection<File> dataDirs,
+                   Collection<File> editsDirs,
                    StartupOption startOpt) throws IOException {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
-      fsImage.setStorageDirectories(dataDirs);
+      fsImage.setStorageDirectories(dataDirs, editsDirs);
       fsImage.format();
       startOpt = StartupOption.REGULAR;
     }
     try {
-      if (fsImage.recoverTransitionRead(dataDirs, startOpt)) {
+      if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
         fsImage.saveFSImage();
       }
       FSEditLog editLog = fsImage.getEditLog();
       assert editLog != null : "editLog must be initialized";
       if (!editLog.isOpen())
         editLog.open();
-      fsImage.setCheckpointDirectories(null);
+      fsImage.setCheckpointDirectories(null, null);
     } catch(IOException e) {
       fsImage.close();
       throw e;

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Aug 29 16:13:12 2008
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.lang.Math;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
@@ -36,6 +37,8 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.permission.*;
@@ -231,6 +234,13 @@
                               " at offset " +  newsize);
       }
     }
+    
+    /**
+     * Returns the file associated with this stream
+     */
+    File getFile() {
+      return file;
+    }
   }
 
   static class EditLogFileInputStream extends EditLogInputStream {
@@ -275,17 +285,21 @@
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = FSNamesystem.now();
   }
-
-  private File getEditFile(int idx) {
-    return fsimage.getEditFile(idx);
+  
+  private File getEditFile(StorageDirectory sd) {
+    return fsimage.getEditFile(sd);
   }
-
-  private File getEditNewFile(int idx) {
-    return fsimage.getEditNewFile(idx);
+  
+  private File getEditNewFile(StorageDirectory sd) {
+    return fsimage.getEditNewFile(sd);
   }
   
   private int getNumStorageDirs() {
-    return fsimage.getNumStorageDirs();
+ int numStorageDirs = 0;
+ for (Iterator<StorageDirectory> it = 
+       fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext(); it.next())
+   numStorageDirs++;
+    return numStorageDirs;
   }
   
   synchronized int getNumEditStreams() {
@@ -304,18 +318,19 @@
    */
   public synchronized void open() throws IOException {
     numTransactions = totalTimeTransactions = 0;
-    int size = getNumStorageDirs();
     if (editStreams == null)
-      editStreams = new ArrayList<EditLogOutputStream>(size);
-    for (int idx = 0; idx < size; idx++) {
-      File eFile = getEditFile(idx);
+      editStreams = new ArrayList<EditLogOutputStream>();
+    for (Iterator<StorageDirectory> it = 
+           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      File eFile = getEditFile(sd);
       try {
         EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
         editStreams.add(eStream);
       } catch (IOException e) {
         FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
-        fsimage.processIOError(idx);
-        idx--;
+        // Remove the directory from list of storage directories
+        it.remove();
       }
     }
   }
@@ -330,8 +345,9 @@
    * Create edits.new if non existent.
    */
   synchronized void createNewIfMissing() throws IOException {
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      File newFile = getEditNewFile(idx);
+ for (Iterator<StorageDirectory> it = 
+       fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      File newFile = getEditNewFile(it.next());
       if (!newFile.exists())
         createEditLogFile(newFile);
     }
@@ -380,14 +396,39 @@
     }
     assert(index < getNumStorageDirs());
     assert(getNumStorageDirs() == editStreams.size());
-
+    
+    File parentStorageDir = ((EditLogFileOutputStream)editStreams
+                                      .get(index)).getFile()
+                                      .getParentFile().getParentFile();
     editStreams.remove(index);
     //
     // Invoke the ioerror routine of the fsimage
     //
-    fsimage.processIOError(index);
+    fsimage.processIOError(parentStorageDir);
   }
-
+  
+  /**
+   * If there is an IO Error on any log operations on storage directory,
+   * remove any stream associated with that directory 
+   */
+  synchronized void processIOError(StorageDirectory sd) {
+    // Try to remove stream only if one should exist
+    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+      return;
+    if (editStreams == null || editStreams.size() <= 1) {
+      FSNamesystem.LOG.fatal(
+          "Fatal Error : All storage directories are inaccessible."); 
+      Runtime.getRuntime().exit(-1);
+    }
+    for (int idx = 0; idx < editStreams.size(); idx++) {
+      File parentStorageDir = ((EditLogFileOutputStream)editStreams
+                                       .get(idx)).getFile()
+                                       .getParentFile().getParentFile();
+      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+        editStreams.remove(idx);
+ }
+  }
+  
   /**
    * The specified streams have IO errors. Remove them from logging
    * new transactions.
@@ -412,20 +453,16 @@
       }
       processIOError(j);
     }
-    int failedStreamIdx = 0;
-    while(failedStreamIdx >= 0) {
-      failedStreamIdx = fsimage.incrementCheckpointTime();
-      if(failedStreamIdx >= 0)
-        processIOError(failedStreamIdx);
-    }
+    fsimage.incrementCheckpointTime();
   }
 
   /**
    * check if ANY edits.new log exists
    */
   boolean existsNew() throws IOException {
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      if (getEditNewFile(idx).exists()) { 
+    for (Iterator<StorageDirectory> it = 
+           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      if (getEditNewFile(it.next()).exists()) { 
         return true;
       }
     }
@@ -1022,7 +1059,7 @@
   synchronized long getEditLogSize() throws IOException {
     assert(getNumStorageDirs() == editStreams.size());
     long size = 0;
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+    for (int idx = 0; idx < editStreams.size(); idx++) {
       long curSize = editStreams.get(idx).length();
       assert (size == 0 || size == curSize) : "All streams must be the same";
       size = curSize;
@@ -1040,10 +1077,12 @@
     // exists in all directories.
     //
     if (existsNew()) {
-      for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-        if (!getEditNewFile(idx).exists()) { 
+      for (Iterator<StorageDirectory> it = 
+               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+        File editsNew = getEditNewFile(it.next());
+     if (!editsNew.exists()) { 
           throw new IOException("Inconsistent existance of edits.new " +
-                                getEditNewFile(idx));
+                                editsNew);
         }
       }
       return; // nothing to do, edits.new exists!
@@ -1054,14 +1093,18 @@
     //
     // Open edits.new
     //
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+    for (Iterator<StorageDirectory> it = 
+           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
       try {
-        EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(idx));
+        EditLogFileOutputStream eStream = 
+             new EditLogFileOutputStream(getEditNewFile(sd));
         eStream.create();
         editStreams.add(eStream);
       } catch (IOException e) {
-        processIOError(idx);
-        idx--;
+        // remove stream and this storage directory from list
+        processIOError(sd);
+       it.remove();
       }
     }
   }
@@ -1083,16 +1126,18 @@
     //
     // Delete edits and rename edits.new to edits.
     //
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      if (!getEditNewFile(idx).renameTo(getEditFile(idx))) {
+    for (Iterator<StorageDirectory> it = 
+           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
         //
         // renameTo() fails on Windows if the destination
         // file exists.
         //
-        getEditFile(idx).delete();
-        if (!getEditNewFile(idx).renameTo(getEditFile(idx))) {
-          fsimage.processIOError(idx); 
-          idx--; 
+        getEditFile(sd).delete();
+        if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
+          // Should we also remove from edits
+          it.remove(); 
         }
       }
     }
@@ -1106,7 +1151,11 @@
    * Return the name of the edit file
    */
   synchronized File getFsEditName() throws IOException {
-    return getEditFile(0);
+    StorageDirectory sd = null;
+    for (Iterator<StorageDirectory> it = 
+           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();)
+      sd = it.next();
+    return getEditFile(sd);
   }
 
   /**

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Aug 29 16:13:12 2008
@@ -29,13 +29,14 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.text.SimpleDateFormat;
-import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Map;
+import java.util.HashMap;
 import java.lang.Math;
 import java.nio.ByteBuffer;
 
@@ -58,6 +59,7 @@
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
@@ -83,6 +85,29 @@
     String getName() {return fileName;}
   }
   
+  /**
+   * Implementation of StorageDirType specific to namenode storage
+   * A Storage directory could be of type IMAGE which stores only fsimage,
+   * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which 
+   * stores both fsimage and edits.
+   */
+  static enum NameNodeDirType implements StorageDirType {
+    UNDEFINED,
+    IMAGE,
+    EDITS,
+    IMAGE_AND_EDITS;
+    
+    public StorageDirType getStorageDirType() {
+      return this;
+    }
+    
+    public boolean isOfType(StorageDirType type) {
+      if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
+        return true;
+      return this == type;
+    }
+  }
+  
   protected long checkpointTime = -1L;
   private FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
@@ -90,6 +115,7 @@
    * Directories for importing an image from a checkpoint.
    */
   private Collection<File> checkpointDirs;
+  private Collection<File> checkpointEditsDirs;
 
   /**
    * Can fs-image be rolled?
@@ -111,9 +137,10 @@
 
   /**
    */
-  FSImage(Collection<File> fsDirs) throws IOException {
+  FSImage(Collection<File> fsDirs, Collection<File> fsEditsDirs) 
+    throws IOException {
     this();
-    setStorageDirectories(fsDirs);
+    setStorageDirectories(fsDirs, fsEditsDirs);
   }
 
   public FSImage(StorageInfo storageInfo) {
@@ -126,57 +153,77 @@
   public FSImage(File imageDir) throws IOException {
     this();
     ArrayList<File> dirs = new ArrayList<File>(1);
+    ArrayList<File> editsDirs = new ArrayList<File>(1);
     dirs.add(imageDir);
-    setStorageDirectories(dirs);
+    editsDirs.add(imageDir);
+    setStorageDirectories(dirs, editsDirs);
   }
   
-  void setStorageDirectories(Collection<File> fsDirs) throws IOException {
-    this.storageDirs = new ArrayList<StorageDirectory>(fsDirs.size());
-    for(Iterator<File> it = fsDirs.iterator(); it.hasNext();)
-      this.addStorageDir(new StorageDirectory(it.next()));
+  void setStorageDirectories(Collection<File> fsNameDirs,
+                        Collection<File> fsEditsDirs
+                             ) throws IOException {
+    this.storageDirs = new ArrayList<StorageDirectory>();
+   // Add all name dirs with appropriate NameNodeDirType 
+    for (File dirName : fsNameDirs) {
+      boolean isAlsoEdits = false;
+      for (File editsDirName : fsEditsDirs) {
+        if (editsDirName.compareTo(dirName) == 0) {
+          isAlsoEdits = true;
+          fsEditsDirs.remove(editsDirName);
+          break;
+        }
+      }
+      NameNodeDirType dirType = (isAlsoEdits) ?
+                          NameNodeDirType.IMAGE_AND_EDITS :
+                          NameNodeDirType.IMAGE;
+      this.addStorageDir(new StorageDirectory(dirName, dirType));
+    }
+    
+    // Add edits dirs if they are different from name dirs
+    for (File dirName : fsEditsDirs) {
+      this.addStorageDir(new StorageDirectory(dirName, 
+                    NameNodeDirType.EDITS));
+    }
   }
 
-  void setCheckpointDirectories(Collection<File> dirs) {
+  void setCheckpointDirectories(Collection<File> dirs,
+                                Collection<File> editsDirs) {
     checkpointDirs = dirs;
-  }
-
-  /**
-   */
-  File getImageFile(int imageDirIdx, NameNodeFile type) {
-    return getImageFile(getStorageDir(imageDirIdx), type);
+    checkpointEditsDirs = editsDirs;
   }
   
   static File getImageFile(StorageDirectory sd, NameNodeFile type) {
     return new File(sd.getCurrentDir(), type.getName());
   }
   
-  File getEditFile(int idx) {
-    return getImageFile(idx, NameNodeFile.EDITS);
+  File getEditFile(StorageDirectory sd) {
+    return getImageFile(sd, NameNodeFile.EDITS);
   }
   
-  File getEditNewFile(int idx) {
-    return getImageFile(idx, NameNodeFile.EDITS_NEW);
+  File getEditNewFile(StorageDirectory sd) {
+    return getImageFile(sd, NameNodeFile.EDITS_NEW);
   }
 
-  File[] getFileNames(NameNodeFile type) {
-    File[] list = new File[getNumStorageDirs()];
-    int i=0;
-    for(StorageDirectory sd : storageDirs) {
-      list[i++] = getImageFile(sd, type);
+  File[] getFileNames(NameNodeFile type, NameNodeDirType dirType) {
+    ArrayList<File> list = new ArrayList<File>();
+    Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
+                                    dirIterator(dirType);
+    for ( ;it.hasNext(); ) {
+      list.add(getImageFile(it.next(), type));
     }
-    return list;
+    return list.toArray(new File[list.size()]);
   }
 
   File[] getImageFiles() {
-    return getFileNames(NameNodeFile.IMAGE);
+    return getFileNames(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
   }
 
   File[] getEditsFiles() {
-    return getFileNames(NameNodeFile.EDITS);
+    return getFileNames(NameNodeFile.EDITS, NameNodeDirType.EDITS);
   }
 
   File[] getTimeFiles() {
-    return getFileNames(NameNodeFile.TIME);
+    return getFileNames(NameNodeFile.TIME, null);
   }
 
   /**
@@ -191,25 +238,36 @@
    * @return true if the image needs to be saved or false otherwise
    */
   boolean recoverTransitionRead(Collection<File> dataDirs,
-                             StartupOption startOpt
-                             ) throws IOException {
+                             Collection<File> editsDirs,
+                                StartupOption startOpt
+                                ) throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
-
+    
+    // none of the data dirs exist
+    if (dataDirs.size() == 0 || editsDirs.size() == 0)  
+      throw new IOException(
+        "All specified directories are not accessible or do not exist.");
+    
     if(startOpt == StartupOption.IMPORT 
         && (checkpointDirs == null || checkpointDirs.isEmpty()))
       throw new IOException("Cannot import image from a checkpoint. "
                           + "\"fs.checkpoint.dir\" is not set." );
 
+    if(startOpt == StartupOption.IMPORT 
+        && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
+      throw new IOException("Cannot import image from a checkpoint. "
+                          + "\"fs.checkpoint.edits.dir\" is not set." );
+    
+    setStorageDirectories(dataDirs, editsDirs);
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
-    AbstractList<StorageState> dataDirStates = 
-      new ArrayList<StorageState>(dataDirs.size());
+    Map<StorageDirectory, StorageState> dataDirStates = 
+             new HashMap<StorageDirectory, StorageState>();
     boolean isFormatted = false;
-    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
-      File dataDir = it.next();
-      StorageDirectory sd = new StorageDirectory(dataDir);
+    for (Iterator<StorageDirectory> it = 
+                      dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt);
@@ -217,7 +275,7 @@
         switch(curState) {
         case NON_EXISTENT:
           // name-node fails if any of the configured storage dirs are missing
-          throw new InconsistentFSStateException(sd.root,
+          throw new InconsistentFSStateException(sd.getRoot(),
                                                  "storage directory does not exist or is not accessible.");
         case NOT_FORMATTED:
           break;
@@ -234,19 +292,14 @@
         if (startOpt == StartupOption.IMPORT && isFormatted)
           // import of a checkpoint is allowed only into empty image directories
           throw new IOException("Cannot import image from a checkpoint. " 
-              + " NameNode already contains an image in " + sd.root);
+              + " NameNode already contains an image in " + sd.getRoot());
       } catch (IOException ioe) {
         sd.unlock();
         throw ioe;
       }
-      // add to the storage list
-      addStorageDir(sd);
-      dataDirStates.add(curState);
+      dataDirStates.put(sd,curState);
     }
-
-    if (dataDirs.size() == 0)  // none of the data dirs exist
-      throw new IOException(
-        "All specified directories are not accessible or do not exist.");
+    
     if (!isFormatted && startOpt != StartupOption.ROLLBACK 
                      && startOpt != StartupOption.IMPORT)
       throw new IOException("NameNode is not formatted.");
@@ -265,14 +318,15 @@
 
     // 2. Format unformatted dirs.
     this.checkpointTime = 0L;
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
-      StorageState curState = dataDirStates.get(idx);
+    for (Iterator<StorageDirectory> it = 
+                     dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      StorageState curState = dataDirStates.get(sd);
       switch(curState) {
       case NON_EXISTENT:
         assert false : StorageState.NON_EXISTENT + " state cannot be here";
       case NOT_FORMATTED:
-        LOG.info("Storage directory " + sd.root + " is not formatted.");
+        LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
         LOG.info("Formatting ...");
         sd.clearDirectory(); // create empty currrent dir
         break;
@@ -308,10 +362,11 @@
     }
     // Upgrade is allowed only if there are 
     // no previous fs states in any of the directories
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       if (sd.getPreviousDir().exists())
-        throw new InconsistentFSStateException(sd.root,
+        throw new InconsistentFSStateException(sd.getRoot(),
                                                "previous fs state should not exist during upgrade. "
                                                + "Finalize or rollback first.");
     }
@@ -325,9 +380,10 @@
     int oldLV = this.getLayoutVersion();
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     this.checkpointTime = FSNamesystem.now();
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
-      LOG.info("Upgrading image directory " + sd.root 
+    for (Iterator<StorageDirectory> it = 
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      LOG.info("Upgrading image directory " + sd.getRoot()
                + ".\n   old LV = " + oldLV
                + "; old CTime = " + oldCTime
                + ".\n   new LV = " + this.getLayoutVersion()
@@ -350,7 +406,7 @@
       // rename tmp to previous
       rename(tmpDir, prevDir);
       isUpgradeFinalized = false;
-      LOG.info("Upgrade of " + sd.root + " is complete.");
+      LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
     }
     initializeDistributedUpgrade();
     editLog.open();
@@ -363,16 +419,17 @@
     boolean canRollback = false;
     FSImage prevState = new FSImage();
     prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                       dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       File prevDir = sd.getPreviousDir();
       if (!prevDir.exists()) {  // use current directory then
-        LOG.info("Storage directory " + sd.root
+        LOG.info("Storage directory " + sd.getRoot()
                  + " does not contain previous fs state.");
         sd.read(); // read and verify consistency with other directories
         continue;
       }
-      StorageDirectory sdPrev = prevState.new StorageDirectory(sd.root);
+      StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
       sdPrev.read(sdPrev.getPreviousVersionFile());  // read and verify consistency of the prev dir
       canRollback = true;
     }
@@ -382,13 +439,14 @@
 
     // Now that we know all directories are going to be consistent
     // Do rollback for each directory containing previous state
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                          dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       File prevDir = sd.getPreviousDir();
       if (!prevDir.exists())
         continue;
 
-      LOG.info("Rolling back storage directory " + sd.root 
+      LOG.info("Rolling back storage directory " + sd.getRoot()
                + ".\n   new LV = " + prevState.getLayoutVersion()
                + "; new CTime = " + prevState.getCTime());
       File tmpDir = sd.getRemovedTmp();
@@ -402,7 +460,7 @@
 
       // delete tmp dir
       deleteDir(tmpDir);
-      LOG.info("Rollback of " + sd.root + " is complete.");
+      LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
     }
     isUpgradeFinalized = true;
     // check whether name-node can start in regular mode
@@ -413,11 +471,11 @@
     File prevDir = sd.getPreviousDir();
     if (!prevDir.exists()) { // already discarded
       LOG.info("Directory " + prevDir + " does not exist.");
-      LOG.info("Finalize upgrade for " + sd.root + " is not required.");
+      LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
       return;
     }
     LOG.info("Finalizing upgrade for storage directory " 
-             + sd.root + "."
+             + sd.getRoot() + "."
              + (getLayoutVersion()==0 ? "" :
                    "\n   cur LV = " + this.getLayoutVersion()
                    + "; cur CTime = " + this.getCTime()));
@@ -427,7 +485,7 @@
     rename(prevDir, tmpDir);
     deleteDir(tmpDir);
     isUpgradeFinalized = true;
-    LOG.info("Finalize upgrade for " + sd.root + " is complete.");
+    LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
   }
 
   /**
@@ -443,7 +501,8 @@
     fsNamesys.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
-      ckptImage.recoverTransitionRead(checkpointDirs, StartupOption.REGULAR);
+      ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
+                                              StartupOption.REGULAR);
     } finally {
       ckptImage.close();
     }
@@ -455,8 +514,10 @@
   }
 
   void finalizeUpgrade() throws IOException {
-    for(int idx = 0; idx < getNumStorageDirs(); idx++)
-      doFinalize(getStorageDir(idx));
+    for (Iterator<StorageDirectory> it = 
+                          dirIterator(); it.hasNext();) {
+      doFinalize(it.next());
+    }
   }
 
   boolean isUpgradeFinalized() {
@@ -469,7 +530,7 @@
     super.getFields(props, sd);
     if (layoutVersion == 0)
       throw new IOException("NameNode directory " 
-                            + sd.root + " is not formatted.");
+                            + sd.getRoot() + " is not formatted.");
     String sDUS, sDUV;
     sDUS = props.getProperty("distributedUpgradeState"); 
     sDUV = props.getProperty("distributedUpgradeVersion");
@@ -539,30 +600,38 @@
   /**
    * Record new checkpoint time in order to
    * distinguish healthy directories from the removed ones.
-   * 
-   * @return -1 if successful, or the index of the failed storage directory.
+   * If there is an error writing new checkpoint time, the corresponding
+   * storage directory is removed from the list.
    */
-  int incrementCheckpointTime() {
+  void incrementCheckpointTime() {
     this.checkpointTime++;
-    // Write new checkpoint time.
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
+    
+    // Write new checkpoint time in all storage directories
+    for(Iterator<StorageDirectory> it =
+                          dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       try {
-        StorageDirectory sd = getStorageDir(idx);
         writeCheckpointTime(sd);
-      } catch(IOException e) { 
-        return idx;
+      } catch(IOException e) {
+        // Close any edits stream associated with this dir and remove directory
+     if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+       editLog.processIOError(sd);
+     it.remove();
       }
     }
-    return -1;
   }
-
+  
   /**
-   * If there is an IO Error on any log operations, remove that
-   * directory from the list of directories.
+   * Remove storage directory given directory
    */
-  void processIOError(int index) {
-    assert(index >= 0 && index < getNumStorageDirs());
-    storageDirs.remove(index);
+  
+  void processIOError(File dirName) {
+    for (Iterator<StorageDirectory> it = 
+            dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if (sd.getRoot().getPath().equals(dirName.getParent()))
+        it.remove();
+    }
   }
 
   public FSEditLog getEditLog() {
@@ -570,10 +639,10 @@
   }
 
   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
-    File oldImageDir = new File(sd.root, "image");
+    File oldImageDir = new File(sd.getRoot(), "image");
     if (!oldImageDir.exists()) {
       if(sd.getVersionFile().exists())
-        throw new InconsistentFSStateException(sd.root,
+        throw new InconsistentFSStateException(sd.getRoot(),
             oldImageDir + " does not exist.");
       return false;
     }
@@ -594,15 +663,19 @@
   //
   // Atomic move sequence, to recover from interrupted checkpoint
   //
-  void recoverInterruptedCheckpoint(StorageDirectory sd) throws IOException {
-    File curFile = getImageFile(sd, NameNodeFile.IMAGE);
-    File ckptFile = getImageFile(sd, NameNodeFile.IMAGE_NEW);
+  boolean recoverInterruptedCheckpoint(StorageDirectory nameSD,
+                                       StorageDirectory editsSD) 
+                                       throws IOException {
+    boolean needToSave = false;
+    File curFile = getImageFile(nameSD, NameNodeFile.IMAGE);
+    File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW);
 
     //
     // If we were in the midst of a checkpoint
     //
     if (ckptFile.exists()) {
-      if (getImageFile(sd, NameNodeFile.EDITS_NEW).exists()) {
+      needToSave = true;
+      if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
         //
         // checkpointing migth have uploaded a new
         // merged image, but we discard it here because we are
@@ -622,7 +695,8 @@
         // if the destination file already exists.
         //
         if (!ckptFile.renameTo(curFile)) {
-          curFile.delete();
+          if (!curFile.delete())
+            LOG.warn("Unable to delete dir " + curFile + " before rename");
           if (!ckptFile.renameTo(curFile)) {
             throw new IOException("Unable to rename " + ckptFile +
                                   " to " + curFile);
@@ -630,6 +704,7 @@
         }
       }
     }
+    return needToSave;
   }
 
   /**
@@ -641,53 +716,74 @@
    */
   boolean loadFSImage() throws IOException {
     // Now check all curFiles and see which is the newest
-    long latestCheckpointTime = Long.MIN_VALUE;
-    StorageDirectory latestSD = null;
+    long latestNameCheckpointTime = Long.MIN_VALUE;
+    long latestEditsCheckpointTime = Long.MIN_VALUE;
+    StorageDirectory latestNameSD = null;
+    StorageDirectory latestEditsSD = null;
     boolean needToSave = false;
     isUpgradeFinalized = true;
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
-      recoverInterruptedCheckpoint(sd);
+    for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       if (!sd.getVersionFile().exists()) {
         needToSave |= true;
         continue; // some of them might have just been formatted
       }
-      assert getImageFile(sd, NameNodeFile.IMAGE).exists() :
-        "Image file must exist.";
+      if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE))
+        assert getImageFile(sd, NameNodeFile.IMAGE).exists() :
+          "Image file must exist.";
+      if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+        assert getImageFile(sd, NameNodeFile.EDITS).exists() :
+          "Edits file must exist.";
+      
       checkpointTime = readCheckpointTime(sd);
       if ((checkpointTime != Long.MIN_VALUE) && 
-          (checkpointTime != latestCheckpointTime)) {
+          ((checkpointTime != latestNameCheckpointTime) || 
+           (checkpointTime != latestEditsCheckpointTime))) {
         // Force saving of new image if checkpoint time
         // is not same in all of the storage directories.
         needToSave |= true;
       }
-      if (latestCheckpointTime < checkpointTime) {
-        latestCheckpointTime = checkpointTime;
-        latestSD = sd;
+      if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) && 
+         (latestNameCheckpointTime < checkpointTime)) {
+        latestNameCheckpointTime = checkpointTime;
+        latestNameSD = sd;
+      }
+      if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) && 
+           (latestEditsCheckpointTime < checkpointTime)) {
+        latestEditsCheckpointTime = checkpointTime;
+        latestEditsSD = sd;
       }
       if (checkpointTime <= 0L)
         needToSave |= true;
       // set finalized flag
       isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
     }
-    assert latestSD != null : "Latest storage directory was not determined.";
+    assert latestNameSD != null : "Latest image storage directory was " +
+                                  "not determined.";
+    assert latestEditsSD != null : "Latest edits storage directory was " +
+                                   "not determined.";
+    
+    // Make sure we are loading image and edits from same checkpoint
+    if (latestNameCheckpointTime != latestEditsCheckpointTime)
+      throw new IOException("Inconsitent storage detected, " +
+                            "name and edits storage do not match");
+    
+    // Recover from previous interrrupted checkpoint if any
+    needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
 
     long startTime = FSNamesystem.now();
-    long imageSize = getImageFile(latestSD, NameNodeFile.IMAGE).length();
+    long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
 
     //
     // Load in bits
     //
-    latestSD.read();
-    needToSave |= loadFSImage(getImageFile(latestSD, NameNodeFile.IMAGE));
-
+    latestNameSD.read();
+    needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
     LOG.info("Image file of size " + imageSize + " loaded in " 
         + (FSNamesystem.now() - startTime)/1000 + " seconds.");
-    //
-    // read in the editlog from the same directory from
-    // which we read in the image
-    //
-    needToSave |= (loadFSEdits(latestSD) > 0);
+    
+    // Load latest edits
+    needToSave |= (loadFSEdits(latestEditsSD) > 0);
     
     return needToSave;
   }
@@ -911,13 +1007,18 @@
    */
   public void saveFSImage() throws IOException {
     editLog.createNewIfMissing();
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
-      saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
-      editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
-      File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
-      if (editsNew.exists()) 
-        editLog.createEditLogFile(editsNew);
+    for (Iterator<StorageDirectory> it = 
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
+      if (dirType.isOfType(NameNodeDirType.IMAGE))
+        saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
+      if (dirType.isOfType(NameNodeDirType.EDITS)) {    
+        editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+        File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
+        if (editsNew.exists()) 
+          editLog.createEditLogFile(editsNew);
+      }
     }
     ckptState = CheckpointStates.UPLOAD_DONE;
     rollFSImage();
@@ -950,13 +1051,16 @@
     sd.clearDirectory(); // create currrent dir
     sd.lock();
     try {
-      saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
-      editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+      NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
+      if (dirType.isOfType(NameNodeDirType.IMAGE))
+        saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
+      if (dirType.isOfType(NameNodeDirType.EDITS))
+        editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
       sd.write();
     } finally {
       sd.unlock();
     }
-    LOG.info("Storage directory " + sd.root 
+    LOG.info("Storage directory " + sd.getRoot()
              + " has been successfully formatted.");
   }
 
@@ -965,8 +1069,9 @@
     this.namespaceID = newNamespaceID();
     this.cTime = 0L;
     this.checkpointTime = FSNamesystem.now();
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       format(sd);
     }
   }
@@ -1153,8 +1258,9 @@
     if (!editLog.existsNew()) {
       throw new IOException("New Edits file does not exist");
     }
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       if (!ckpt.exists()) {
         throw new IOException("Checkpoint file " + ckpt +
@@ -1166,8 +1272,9 @@
     //
     // Renames new image
     //
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       File curFile = getImageFile(sd, NameNodeFile.IMAGE);
       // renameTo fails on Windows if the destination file 
@@ -1175,25 +1282,31 @@
       if (!ckpt.renameTo(curFile)) {
         curFile.delete();
         if (!ckpt.renameTo(curFile)) {
-          editLog.processIOError(idx);
-          idx--;
+          // Close edit stream, if this directory is also used for edits
+          if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+            editLog.processIOError(sd);
+          it.remove();
         }
       }
     }
 
     //
-    // Updates the fstime file and write version file
+    // Updates the fstime file on all directories (fsimage and edits)
+    // and write version file
     //
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     this.checkpointTime = FSNamesystem.now();
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      StorageDirectory sd = getStorageDir(idx);
+    for (Iterator<StorageDirectory> it = 
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       try {
         sd.write();
       } catch (IOException e) {
-        LOG.error("Cannot write file " + sd.root, e);
-        editLog.processIOError(idx);
-        idx--;
+        LOG.error("Cannot write file " + sd.getRoot(), e);
+        // Close edit stream, if this directory is also used for edits
+        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+          editLog.processIOError(sd);
+        it.remove();
       }
     }
     ckptState = CheckpointStates.START;
@@ -1244,7 +1357,11 @@
    * Return the name of the image file.
    */
   File getFsImageName() {
-    return getImageFile(0, NameNodeFile.IMAGE);
+  StorageDirectory sd = null;
+  for (Iterator<StorageDirectory> it = 
+              dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
+    sd = it.next();
+  return getImageFile(sd, NameNodeFile.IMAGE); 
   }
 
   public File getFsEditName() throws IOException {
@@ -1252,7 +1369,12 @@
   }
 
   File getFsTimeName() {
-    return getImageFile(0, NameNodeFile.TIME);
+    StorageDirectory sd = null;
+    // NameNodeFile.TIME shoul be same on all directories
+    for (Iterator<StorageDirectory> it = 
+             dirIterator(); it.hasNext();)
+      sd = it.next();
+    return getImageFile(sd, NameNodeFile.TIME);
   }
 
   /**
@@ -1260,11 +1382,12 @@
    * checkpointing.
    */
   File[] getFsImageNameCheckpoint() {
-    File[] list = new File[getNumStorageDirs()];
-    for(int i = 0; i < getNumStorageDirs(); i++) {
-      list[i] = getImageFile(getStorageDir(i), NameNodeFile.IMAGE_NEW);
+    ArrayList<File> list = new ArrayList<File>();
+    for (Iterator<StorageDirectory> it = 
+                 dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW));
     }
-    return list;
+    return list.toArray(new File[list.size()]);
   }
 
   /**
@@ -1383,6 +1506,20 @@
     }
     return dirs;
   }
+  
+  static Collection<File> getCheckpointEditsDirs(Configuration conf,
+                                                 String defaultName) {
+    Collection<String> dirNames = 
+                conf.getStringCollection("fs.checkpoint.edits.dir");
+ if (dirNames.size() == 0 && defaultName != null) {
+   dirNames.add(defaultName);
+ }
+ Collection<File> dirs = new ArrayList<File>(dirNames.size());
+ for(String name : dirNames) {
+   dirs.add(new File(name));
+ }
+ return dirs;    
+  }
 
   static private final UTF8 U_STR = new UTF8();
   static String readString(DataInputStream in) throws IOException {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Aug 29 16:13:12 2008
@@ -302,7 +302,8 @@
     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
     this.dir = new FSDirectory(this, conf);
     StartupOption startOpt = NameNode.getStartupOption(conf);
-    this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
+    this.dir.loadFSImage(getNamespaceDirs(conf),
+                         getNamespaceEditsDirs(conf), startOpt);
     long timeTakenToLoadFSImage = now() - systemStart;
     LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
     NameNode.getNameNodeMetrics().fsImageLoadTime.set(
@@ -385,6 +386,18 @@
     }
     return dirs;
   }
+  
+  public static Collection<File> getNamespaceEditsDirs(Configuration conf) {
+    Collection<String> editsDirNames = 
+            conf.getStringCollection("dfs.name.edits.dir");
+    if (editsDirNames.isEmpty())
+      editsDirNames.add("/tmp/hadoop/dfs/name");
+    Collection<File> dirs = new ArrayList<File>(editsDirNames.size());
+    for(String name : editsDirNames) {
+      dirs.add(new File(name));
+    }
+    return dirs;
+  }
 
   /**
    * dirs is a list of directories where the filesystem directory state 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Aug 29 16:13:12 2008
@@ -742,6 +742,8 @@
                                 boolean isConfirmationNeeded
                                 ) throws IOException {
     Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
+    Collection<File> editDirsToFormat = 
+                 FSNamesystem.getNamespaceEditsDirs(conf);
     for(Iterator<File> it = dirsToFormat.iterator(); it.hasNext();) {
       File curDir = it.next();
       if (!curDir.exists())
@@ -756,7 +758,8 @@
       }
     }
 
-    FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat), conf);
+    FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
+                                         editDirsToFormat), conf);
     nsys.dir.fsImage.format();
     return false;
   }
@@ -765,7 +768,10 @@
                                boolean isConfirmationNeeded
                                ) throws IOException {
     Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
-    FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat), conf);
+    Collection<File> editDirsToFormat = 
+                               FSNamesystem.getNamespaceEditsDirs(conf);
+    FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
+                                         editDirsToFormat), conf);
     System.err.print(
         "\"finalize\" will remove the previous state of the files system.\n"
         + "Recent upgrade will become permanent.\n"

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Aug 29 16:13:12 2008
@@ -34,6 +34,8 @@
 import java.net.*;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 
 /**********************************************************
@@ -66,6 +68,7 @@
   private String infoBindAddress;
 
   private Collection<File> checkpointDirs;
+  private Collection<File> checkpointEditsDirs;
   private long checkpointPeriod;	// in seconds
   private long checkpointSize;    // size (in MB) of current Edit Log
 
@@ -135,8 +138,10 @@
     fsName = getInfoServer();
     checkpointDirs = FSImage.getCheckpointDirs(conf,
                                   "/tmp/hadoop/dfs/namesecondary");
+    checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, 
+                                  "/tmp/hadoop/dfs/namesecondary");    
     checkpointImage = new CheckpointStorage();
-    checkpointImage.recoverCreate(checkpointDirs);
+    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
 
     // Initialize other scheduling parameters from the configuration
     checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
@@ -329,7 +334,7 @@
   private void startCheckpoint() throws IOException {
     checkpointImage.unlockAll();
     checkpointImage.getEditLog().close();
-    checkpointImage.recoverCreate(checkpointDirs);
+    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
     checkpointImage.startCheckpoint();
   }
 
@@ -483,23 +488,29 @@
      * Recover from an unsuccessful checkpoint is necessary. 
      * 
      * @param dataDirs
+     * @param editsDirs
      * @throws IOException
      */
-    void recoverCreate(Collection<File> dataDirs) throws IOException {
-      this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
-      for(File dataDir : dataDirs) {
+    void recoverCreate(Collection<File> dataDirs,
+                       Collection<File> editsDirs) throws IOException {
+      Collection<File> tempDataDirs = new ArrayList<File>(dataDirs);
+      Collection<File> tempEditsDirs = new ArrayList<File>(editsDirs);
+      this.storageDirs = new ArrayList<StorageDirectory>();
+      setStorageDirectories(tempDataDirs, tempEditsDirs);
+      for (Iterator<StorageDirectory> it = 
+                   dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
         boolean isAccessible = true;
         try { // create directories if don't exist yet
-          if(!dataDir.mkdirs()) {
-            // do nothing, directory is already ctreated
+          if(!sd.getRoot().mkdirs()) {
+            // do nothing, directory is already created
           }
         } catch(SecurityException se) {
           isAccessible = false;
         }
         if(!isAccessible)
-          throw new InconsistentFSStateException(dataDir,
+          throw new InconsistentFSStateException(sd.getRoot(),
               "cannot access checkpoint directory.");
-        StorageDirectory sd = new StorageDirectory(dataDir);
         StorageState curState;
         try {
           curState = sd.analyzeStorage(StartupOption.REGULAR);
@@ -507,7 +518,7 @@
           switch(curState) {
           case NON_EXISTENT:
             // fail if any of the configured checkpoint dirs are inaccessible 
-            throw new InconsistentFSStateException(sd.root,
+            throw new InconsistentFSStateException(sd.getRoot(),
                   "checkpoint directory does not exist or is not accessible.");
           case NOT_FORMATTED:
             break;  // it's ok since initially there is no current and VERSION
@@ -520,9 +531,6 @@
           sd.unlock();
           throw ioe;
         }
-        // add to the storage list
-        addStorageDir(sd);
-        LOG.warn("Checkpoint directory " + sd.root + " is added.");
       }
     }
 
@@ -566,9 +574,19 @@
      */
     private void doMerge(CheckpointSignature sig) throws IOException {
       getEditLog().open();
-      StorageDirectory sd = getStorageDir(0);
-      loadFSImage(FSImage.getImageFile(sd, NameNodeFile.IMAGE));
-      loadFSEdits(sd);
+      StorageDirectory sdName = null;
+      StorageDirectory sdEdits = null;
+      Iterator<StorageDirectory> it = null;
+      it = dirIterator(NameNodeDirType.IMAGE);
+      if (it.hasNext())
+        sdName = it.next();
+      it = dirIterator(NameNodeDirType.EDITS);
+      if (it.hasNext())
+        sdEdits = it.next();
+      if ((sdName == null) || (sdEdits == null))
+        throw new IOException("Could not locate checkpoint directories");
+      loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+      loadFSEdits(sdEdits);
       sig.validateStorageInfo(this);
       saveFSImage();
     }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Aug 29 16:13:12 2008
@@ -96,7 +96,8 @@
   public MiniDFSCluster(Configuration conf,
                         int numDataNodes,
                         StartupOption nameNodeOperation) throws IOException {
-    this(0, conf, numDataNodes, false, false, nameNodeOperation, null, null, null);
+    this(0, conf, numDataNodes, false, false, false,  nameNodeOperation, 
+          null, null, null);
   }
   
   /**
@@ -116,7 +117,7 @@
                         int numDataNodes,
                         boolean format,
                         String[] racks) throws IOException {
-    this(0, conf, numDataNodes, format, true, null, racks, null, null);
+    this(0, conf, numDataNodes, format, true, true,  null, racks, null, null);
   }
   
   /**
@@ -137,7 +138,7 @@
                         int numDataNodes,
                         boolean format,
                         String[] racks, String[] hosts) throws IOException {
-    this(0, conf, numDataNodes, format, true, null, racks, hosts, null);
+    this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null);
   }
   
   /**
@@ -165,8 +166,8 @@
                         boolean manageDfsDirs,
                         StartupOption operation,
                         String[] racks) throws IOException {
-    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, 
-        racks, null, null);
+    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
+         operation, racks, null, null);
   }
 
   /**
@@ -196,8 +197,8 @@
                         StartupOption operation,
                         String[] racks,
                         long[] simulatedCapacities) throws IOException {
-    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, racks, null, 
-        simulatedCapacities);
+    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
+          operation, racks, null, simulatedCapacities);
   }
   
   /**
@@ -212,8 +213,10 @@
    *          will be modified as necessary.
    * @param numDataNodes Number of DataNodes to start; may be zero
    * @param format if true, format the NameNode and DataNodes before starting up
-   * @param manageDfsDirs if true, the data directories for servers will be
+   * @param manageNameDfsDirs if true, the data directories for servers will be
    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
+   * @param manageDataDfsDirs if true, the data directories for datanodes will
+   *          be created and dfs.data.dir set to same in the conf
    * @param operation the operation with which to start the servers.  If null
    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
    * @param racks array of strings indicating the rack that each DataNode is on
@@ -224,7 +227,8 @@
                         Configuration conf,
                         int numDataNodes,
                         boolean format,
-                        boolean manageDfsDirs,
+                        boolean manageNameDfsDirs,
+                        boolean manageDataDfsDirs,
                         StartupOption operation,
                         String[] racks, String hosts[],
                         long[] simulatedCapacities) throws IOException {
@@ -242,9 +246,11 @@
     // Setup the NameNode configuration
     FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
     conf.set("dfs.http.address", "127.0.0.1:0");  
-    if (manageDfsDirs) {
+    if (manageNameDfsDirs) {
       conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
                new File(base_dir, "name2").getPath());
+      conf.set("fs.checkpoint.dir", new File(base_dir, "namesecondary1").
+                getPath()+"," + new File(base_dir, "namesecondary2").getPath());
     }
     
     int replication = conf.getInt("dfs.replication", 3);
@@ -270,7 +276,8 @@
     nameNode = NameNode.createNameNode(args, conf);
     
     // Start the DataNodes
-    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, simulatedCapacities);
+    startDataNodes(conf, numDataNodes, manageDataDfsDirs, 
+                    operation, racks, hosts, simulatedCapacities);
     waitClusterUp();
   }
 
@@ -686,13 +693,20 @@
   }
 
   /**
-   * Get the directories where the namenode stores its state.
+   * Get the directories where the namenode stores its image.
    */
   public Collection<File> getNameDirs() {
     return FSNamesystem.getNamespaceDirs(conf);
   }
 
   /**
+   * Get the directories where the namenode stores its edits.
+   */
+  public Collection<File> getNameEditsDirs() {
+    return FSNamesystem.getNamespaceEditsDirs(conf);
+  }
+
+  /**
    * Wait until the cluster is active and running.
    */
   public void waitActive() throws IOException {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Fri Aug 29 16:13:12 2008
@@ -21,6 +21,7 @@
 import java.io.*;
 import java.util.Collection;
 import java.util.List;
+import java.util.Iterator;
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -28,6 +29,8 @@
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.ErrorSimulator;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -173,10 +176,16 @@
     // and that temporary checkpoint files are gone.
     FSImage image = cluster.getNameNode().getFSImage();
     int nrDirs = image.getNumStorageDirs();
-    for(int idx = 0; idx < nrDirs; idx++) {
-      assertFalse(image.getImageFile(idx, NameNodeFile.IMAGE_NEW).exists());
-      assertFalse(image.getEditNewFile(idx).exists());
-      File edits = image.getEditFile(idx);
+    for (Iterator<StorageDirectory> it = 
+             image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      assertFalse(FSImage.getImageFile(sd, NameNodeFile.IMAGE_NEW).exists());
+    }
+    for (Iterator<StorageDirectory> it = 
+            image.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      assertFalse(image.getEditNewFile(sd).exists());
+      File edits = image.getEditFile(sd);
       assertTrue(edits.exists()); // edits should exist and be empty
       long editsLen = edits.length();
       assertTrue(editsLen == Integer.SIZE/Byte.SIZE);
@@ -335,7 +344,12 @@
     FSImage image = cluster.getNameNode().getFSImage();
     try {
       assertTrue(!fileSys.exists(file1));
-      long fsimageLength = image.getImageFile(0, NameNodeFile.IMAGE).length();
+      StorageDirectory sd = null;
+      for (Iterator<StorageDirectory> it = 
+                image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
+         sd = it.next();
+      assertTrue(sd != null);
+      long fsimageLength = FSImage.getImageFile(sd, NameNodeFile.IMAGE).length();
       //
       // Make the checkpoint
       //
@@ -352,9 +366,9 @@
       ErrorSimulator.clearErrorSimulation(2);
 
       // Verify that image file sizes did not change.
-      int nrDirs = image.getNumStorageDirs();
-      for(int idx = 0; idx < nrDirs; idx++) {
-        assertTrue(image.getImageFile(idx, 
+      for (Iterator<StorageDirectory> it = 
+              image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue(FSImage.getImageFile(it.next(), 
                                 NameNodeFile.IMAGE).length() == fsimageLength);
       }
 
@@ -385,8 +399,11 @@
   void testStartup(Configuration conf) throws IOException {
     System.out.println("Startup of the name-node in the checkpoint directory.");
     String primaryDirs = conf.get("dfs.name.dir");
+    String primaryEditsDirs = conf.get("dfs.name.edits.dir");
     String checkpointDirs = conf.get("fs.checkpoint.dir");
-    NameNode nn = startNameNode(conf, checkpointDirs, StartupOption.REGULAR);
+    String checkpointEditsDirs = conf.get("fs.checkpoint.edits.dir");
+    NameNode nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
+                                 StartupOption.REGULAR);
 
     // Starting secondary node in the same directory as the primary
     System.out.println("Startup of secondary in the same dir as the primary.");
@@ -403,7 +420,8 @@
     // Starting primary node in the same directory as the secondary
     System.out.println("Startup of primary in the same dir as the secondary.");
     // secondary won't start without primary
-    nn = startNameNode(conf, primaryDirs, StartupOption.REGULAR);
+    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+                        StartupOption.REGULAR);
     boolean succeed = false;
     do {
       try {
@@ -415,7 +433,8 @@
     } while(!succeed);
     nn.stop(); nn = null;
     try {
-      nn = startNameNode(conf, checkpointDirs, StartupOption.REGULAR);
+      nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
+                          StartupOption.REGULAR);
       assertFalse(nn.getFSImage().isLockSupported(0));
       nn.stop(); nn = null;
     } catch (IOException e) { // expected to fail
@@ -425,7 +444,8 @@
     // Try another secondary in the same directory
     System.out.println("Startup of two secondaries in the same dir.");
     // secondary won't start without primary
-    nn = startNameNode(conf, primaryDirs, StartupOption.REGULAR);
+    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+                        StartupOption.REGULAR);
     SecondaryNameNode secondary2 = null;
     try {
       secondary2 = startSecondaryNameNode(conf);
@@ -440,15 +460,18 @@
     // Import a checkpoint with existing primary image.
     System.out.println("Import a checkpoint with existing primary image.");
     try {
-      nn = startNameNode(conf, primaryDirs, StartupOption.IMPORT);
+      nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+                          StartupOption.IMPORT);
       assertTrue(false);
     } catch (IOException e) { // expected to fail
       assertTrue(nn == null);
     }
-
+    
     // Remove current image and import a checkpoint.
     System.out.println("Import a checkpoint with existing primary image.");
     List<File> nameDirs = (List<File>)FSNamesystem.getNamespaceDirs(conf);
+    List<File> nameEditsDirs = (List<File>)FSNamesystem.
+                                  getNamespaceEditsDirs(conf);
     long fsimageLength = new File(new File(nameDirs.get(0), "current"), 
                                         NameNodeFile.IMAGE.getName()).length();
     for(File dir : nameDirs) {
@@ -458,18 +481,29 @@
       if (!dir.mkdirs())
         throw new IOException("Cannot create directory " + dir);
     }
-    nn = startNameNode(conf, primaryDirs, StartupOption.IMPORT);
+
+    for(File dir : nameEditsDirs) {
+      if(dir.exists())
+        if(!(FileUtil.fullyDelete(dir)))
+          throw new IOException("Cannot remove directory: " + dir);
+      if (!dir.mkdirs())
+        throw new IOException("Cannot create directory " + dir);
+    }
+    
+    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+                        StartupOption.IMPORT);
     // Verify that image file sizes did not change.
     FSImage image = nn.getFSImage();
-    int nrDirs = image.getNumStorageDirs();
-    for(int idx = 0; idx < nrDirs; idx++) {
-      assertTrue(image.getImageFile(idx, 
-                              NameNodeFile.IMAGE).length() == fsimageLength);
+    for (Iterator<StorageDirectory> it = 
+            image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      assertTrue(FSImage.getImageFile(it.next(), 
+                          NameNodeFile.IMAGE).length() == fsimageLength);
     }
     nn.stop();
 
     // recover failed checkpoint
-    nn = startNameNode(conf, primaryDirs, StartupOption.REGULAR);
+    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+                        StartupOption.REGULAR);
     Collection<File> secondaryDirs = FSImage.getCheckpointDirs(conf, null);
     for(File dir : secondaryDirs) {
       Storage.rename(new File(dir, "current"), 
@@ -504,10 +538,12 @@
 
   NameNode startNameNode( Configuration conf,
                           String imageDirs,
+                          String editsDirs,
                           StartupOption start) throws IOException {
     conf.set("fs.default.name", "hdfs://localhost:0");
     conf.set("dfs.http.address", "0.0.0.0:0");  
     conf.set("dfs.name.dir", imageDirs);
+    conf.set("dfs.name.edits.dir", editsDirs);
     String[] args = new String[]{start.getName()};
     NameNode nn = NameNode.createNameNode(args, conf);
     assertTrue(nn.isInSafeMode());

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Aug 29 16:13:12 2008
@@ -31,6 +31,9 @@
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
 
 /**
  * This class tests the creation and validation of a checkpoint.
@@ -85,6 +88,7 @@
     // start a cluster 
 
     Collection<File> namedirs = null;
+    Collection<File> editsdirs = null;
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes, 
                                                 true, true, null, null);
@@ -94,6 +98,7 @@
 
     try {
       namedirs = cluster.getNameDirs();
+      editsdirs = cluster.getNameEditsDirs();
     } finally {
       fileSys.close();
       cluster.shutdown();
@@ -105,7 +110,7 @@
       numdirs++;
     }
 
-    FSImage fsimage = new FSImage(namedirs);
+    FSImage fsimage = new FSImage(namedirs, editsdirs);
     FSEditLog editLog = fsimage.getEditLog();
 
     // set small size of flush buffer
@@ -136,8 +141,9 @@
     // If there were any corruptions, it is likely that the reading in
     // of these transactions will throw an exception.
     //
-    for (int i = 0; i < numdirs; i++) {
-      File editFile = fsimage.getEditFile(i);
+    for (Iterator<StorageDirectory> it = 
+            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
       System.out.println("Verifying file: " + editFile);
       int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile));
       int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease();