You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/02/27 00:32:14 UTC

svn commit: r1293964 [7/11] - in /hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/...

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sun Feb 26 23:32:06 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -35,11 +36,13 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -1069,6 +1072,112 @@ public class FSEditLog  {
   }
 
   /**
+   * Find the best editlog input stream to read from txid. In this case
+   * best means the editlog which has the largest continuous range of 
+   * transactions starting from the transaction id, fromTxId.
+   *
+   * If a journal throws an CorruptionException while reading from a txn id,
+   * it means that it has more transactions, but can't find any from fromTxId. 
+   * If this is the case and no other journal has transactions, we should throw
+   * an exception as it means more transactions exist, we just can't load them.
+   *
+   * @param fromTxId Transaction id to start from.
+   * @return a edit log input stream with tranactions fromTxId 
+   *         or null if no more exist
+   */
+  private EditLogInputStream selectStream(long fromTxId) 
+      throws IOException {
+    JournalManager bestjm = null;
+    long bestjmNumTxns = 0;
+    CorruptionException corruption = null;
+
+    for (JournalAndStream jas : journals) {
+      JournalManager candidate = jas.getManager();
+      long candidateNumTxns = 0;
+      try {
+        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
+      } catch (CorruptionException ce) {
+        corruption = ce;
+      } catch (IOException ioe) {
+        LOG.warn("Error reading number of transactions from " + candidate);
+        continue; // error reading disk, just skip
+      }
+      
+      if (candidateNumTxns > bestjmNumTxns) {
+        bestjm = candidate;
+        bestjmNumTxns = candidateNumTxns;
+      }
+    }
+    
+    
+    if (bestjm == null) {
+      /**
+       * If all candidates either threw a CorruptionException or
+       * found 0 transactions, then a gap exists. 
+       */
+      if (corruption != null) {
+        throw new IOException("Gap exists in logs from " 
+                              + fromTxId, corruption);
+      } else {
+        return null;
+      }
+    }
+
+    return bestjm.getInputStream(fromTxId);
+  }
+
+  /**
+   * Run recovery on all journals to recover any unclosed segments
+   */
+  void recoverUnclosedStreams() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          jas.manager.recoverUnfinalizedSegments();
+        }
+      }, "recovering unclosed streams");
+  }
+
+  /**
+   * Select a list of input streams to load.
+   * @param fromTxId first transaction in the selected streams
+   * @param toAtLeast the selected streams must contain this transaction
+   */
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) 
+      throws IOException {
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    
+    boolean gapFound = false;
+    EditLogInputStream stream = selectStream(fromTxId);
+    while (stream != null) {
+      fromTxId = stream.getLastTxId() + 1;
+      streams.add(stream);
+      try {
+        stream = selectStream(fromTxId);
+      } catch (IOException ioe) {
+        gapFound = true;
+        break;
+      }
+    }
+    if (fromTxId <= toAtLeastTxId || gapFound) {
+      closeAllStreams(streams);
+      throw new IOException("No non-corrupt logs for txid " 
+                            + fromTxId);
+    }
+    return streams;
+  }
+
+  /** 
+   * Close all the streams in a collection
+   * @param streams The list of streams to close
+   */
+  static void closeAllStreams(Iterable<EditLogInputStream> streams) {
+    for (EditLogInputStream s : streams) {
+      IOUtils.closeStream(s);
+    }
+  }
+
+  /**
    * Container for a JournalManager paired with its currently
    * active stream.
    * 
@@ -1137,30 +1246,5 @@ public class FSEditLog  {
     JournalManager getManager() {
       return manager;
     }
-
-    private EditLogInputStream getInProgressInputStream() throws IOException {
-      return manager.getInProgressInputStream(segmentStartsAtTxId);
-    }
-  }
-
-  /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. This is used from the BackupNode
-   * during edits synchronization.
-   * @throws IOException if no valid logs are available.
-   */
-  synchronized EditLogInputStream getInProgressFileInputStream()
-      throws IOException {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      try {
-        EditLogInputStream in = jas.getInProgressInputStream();
-        if (in != null) return in;
-      } catch (IOException ioe) {
-        LOG.warn("Unable to get the in-progress input stream from " + jas,
-            ioe);
-      }
-    }
-    throw new IOException("No in-progress stream provided edits");
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Sun Feb 26 23:32:06 2012
@@ -467,24 +467,6 @@ public class FSEditLogLoader {
     }
   }
   
-  static EditLogValidation validateEditLog(File file) throws IOException {
-    EditLogFileInputStream in;
-    try {
-      in = new EditLogFileInputStream(file);
-    } catch (LogHeaderCorruptException corrupt) {
-      // If it's missing its header, this is equivalent to no transactions
-      FSImage.LOG.warn("Log at " + file + " has no valid header",
-          corrupt);
-      return new EditLogValidation(0, 0);
-    }
-    
-    try {
-      return validateEditLog(in);
-    } finally {
-      IOUtils.closeStream(in);
-    }
-  }
-
   /**
    * Return the number of valid transactions in the stream. If the stream is
    * truncated during the header, returns a value indicating that there are
@@ -494,12 +476,26 @@ public class FSEditLogLoader {
    *                     if the log does not exist)
    */
   static EditLogValidation validateEditLog(EditLogInputStream in) {
-    long numValid = 0;
     long lastPos = 0;
+    long firstTxId = HdfsConstants.INVALID_TXID;
+    long lastTxId = HdfsConstants.INVALID_TXID;
+    long numValid = 0;
     try {
+      FSEditLogOp op = null;
       while (true) {
         lastPos = in.getPosition();
-        if (in.readOp() == null) {
+        if ((op = in.readOp()) == null) {
+          break;
+        }
+        if (firstTxId == HdfsConstants.INVALID_TXID) {
+          firstTxId = op.txid;
+        }
+        if (lastTxId == HdfsConstants.INVALID_TXID
+            || op.txid == lastTxId + 1) {
+          lastTxId = op.txid;
+        } else {
+          FSImage.LOG.error("Out of order txid found. Found " + op.txid 
+                            + ", expected " + (lastTxId + 1));
           break;
         }
         numValid++;
@@ -510,16 +506,33 @@ public class FSEditLogLoader {
       FSImage.LOG.debug("Caught exception after reading " + numValid +
           " ops from " + in + " while determining its valid length.", t);
     }
-    return new EditLogValidation(lastPos, numValid);
+    return new EditLogValidation(lastPos, firstTxId, lastTxId);
   }
   
   static class EditLogValidation {
-    long validLength;
-    long numTransactions;
-    
-    EditLogValidation(long validLength, long numTransactions) {
+    private long validLength;
+    private long startTxId;
+    private long endTxId;
+     
+    EditLogValidation(long validLength, 
+                      long startTxId, long endTxId) {
       this.validLength = validLength;
-      this.numTransactions = numTransactions;
+      this.startTxId = startTxId;
+      this.endTxId = endTxId;
+    }
+    
+    long getValidLength() { return validLength; }
+    
+    long getStartTxId() { return startTxId; }
+    
+    long getEndTxId() { return endTxId; }
+    
+    long getNumTransactions() { 
+      if (endTxId == HdfsConstants.INVALID_TXID
+          || startTxId == HdfsConstants.INVALID_TXID) {
+        return 0;
+      }
+      return (endTxId - startTxId) + 1;
     }
   }
 

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sun Feb 26 23:32:06 2012
@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.com
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
+
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -70,7 +70,6 @@ import com.google.common.collect.Lists;
 public class FSImage implements Closeable {
   protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
-  protected FSNamesystem namesystem = null;
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
 
@@ -82,38 +81,20 @@ public class FSImage implements Closeabl
    */
   protected long lastAppliedTxId = 0;
 
-  /**
-   * URIs for importing an image from a checkpoint. In the default case,
-   * URIs will represent directories. 
-   */
-  private Collection<URI> checkpointDirs;
-  private Collection<URI> checkpointEditsDirs;
-
   final private Configuration conf;
 
   private final NNStorageRetentionManager archivalManager; 
 
-  /**
-   * Construct an FSImage.
-   * @param conf Configuration
-   * @see #FSImage(Configuration conf, FSNamesystem ns, 
-   *               Collection imageDirs, Collection editsDirs) 
-   * @throws IOException if default directories are invalid.
-   */
-  public FSImage(Configuration conf) throws IOException {
-    this(conf, (FSNamesystem)null);
-  }
 
   /**
    * Construct an FSImage
    * @param conf Configuration
-   * @param ns The FSNamesystem using this image.
-   * @see #FSImage(Configuration conf, FSNamesystem ns, 
+   * @see #FSImage(Configuration conf, 
    *               Collection imageDirs, Collection editsDirs) 
    * @throws IOException if default directories are invalid.
    */
-  private FSImage(Configuration conf, FSNamesystem ns) throws IOException {
-    this(conf, ns,
+  protected FSImage(Configuration conf) throws IOException {
+    this(conf,
          FSNamesystem.getNamespaceDirs(conf),
          FSNamesystem.getNamespaceEditsDirs(conf));
   }
@@ -124,17 +105,14 @@ public class FSImage implements Closeabl
    * Setup storage and initialize the edit log.
    *
    * @param conf Configuration
-   * @param ns The FSNamesystem using this image.
    * @param imageDirs Directories the image can be stored in.
    * @param editsDirs Directories the editlog can be stored in.
    * @throws IOException if directories are invalid.
    */
-  protected FSImage(Configuration conf, FSNamesystem ns,
+  protected FSImage(Configuration conf,
                     Collection<URI> imageDirs, Collection<URI> editsDirs)
       throws IOException {
     this.conf = conf;
-    setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
-                             FSImage.getCheckpointEditsDirs(conf, null));
 
     storage = new NNStorage(conf, imageDirs, editsDirs);
     if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
@@ -143,31 +121,18 @@ public class FSImage implements Closeabl
     }
 
     this.editLog = new FSEditLog(storage);
-    setFSNamesystem(ns);
     
     archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
   }
-
-  protected FSNamesystem getFSNamesystem() {
-    return namesystem;
-  }
-
-  void setFSNamesystem(FSNamesystem ns) {
-    namesystem = ns;
-    if (ns != null) {
-      storage.setUpgradeManager(ns.upgradeManager);
-    }
-  }
  
-  void setCheckpointDirectories(Collection<URI> dirs,
-                                Collection<URI> editsDirs) {
-    checkpointDirs = dirs;
-    checkpointEditsDirs = editsDirs;
-  }
-  
-  void format(String clusterId) throws IOException {
+  void format(FSNamesystem fsn, String clusterId) throws IOException {
+    long fileCount = fsn.getTotalFiles();
+    // Expect 1 file, which is the root inode
+    Preconditions.checkState(fileCount == 1,
+        "FSImage.format should be called with an uninitialized namesystem, has " +
+        fileCount + " files");
     storage.format(clusterId);
-    saveFSImageInAllDirs(0);    
+    saveFSImageInAllDirs(fsn, 0);
   }
   
   /**
@@ -179,7 +144,7 @@ public class FSImage implements Closeabl
    * @throws IOException
    * @return true if the image needs to be saved or false otherwise
    */
-  boolean recoverTransitionRead(StartupOption startOpt)
+  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
       throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
@@ -187,21 +152,14 @@ public class FSImage implements Closeabl
     Collection<URI> imageDirs = storage.getImageDirectories();
     Collection<URI> editsDirs = storage.getEditsDirectories();
 
+
     // none of the data dirs exist
     if((imageDirs.size() == 0 || editsDirs.size() == 0) 
                              && startOpt != StartupOption.IMPORT)  
       throw new IOException(
           "All specified directories are not accessible or do not exist.");
     
-    if(startOpt == StartupOption.IMPORT 
-        && (checkpointDirs == null || checkpointDirs.isEmpty()))
-      throw new IOException("Cannot import image from a checkpoint. "
-                            + "\"dfs.namenode.checkpoint.dir\" is not set." );
-
-    if(startOpt == StartupOption.IMPORT 
-        && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
-      throw new IOException("Cannot import image from a checkpoint. "
-                            + "\"dfs.namenode.checkpoint.dir\" is not set." );
+    storage.setUpgradeManager(target.upgradeManager);
     
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
@@ -261,10 +219,10 @@ public class FSImage implements Closeabl
     // 3. Do transitions
     switch(startOpt) {
     case UPGRADE:
-      doUpgrade();
+      doUpgrade(target);
       return false; // upgrade saved image already
     case IMPORT:
-      doImportCheckpoint();
+      doImportCheckpoint(target);
       return false; // import checkpoint saved image already
     case ROLLBACK:
       doRollback();
@@ -273,7 +231,7 @@ public class FSImage implements Closeabl
       // just load the image
     }
     
-    return loadFSImage();
+    return loadFSImage(target);
   }
   
   /**
@@ -324,11 +282,11 @@ public class FSImage implements Closeabl
     return isFormatted;
   }
 
-  private void doUpgrade() throws IOException {
+  private void doUpgrade(FSNamesystem target) throws IOException {
     if(storage.getDistributedUpgradeState()) {
       // only distributed upgrade need to continue
       // don't do version upgrade
-      this.loadFSImage();
+      this.loadFSImage(target);
       storage.initializeDistributedUpgrade();
       return;
     }
@@ -343,7 +301,7 @@ public class FSImage implements Closeabl
     }
 
     // load the latest image
-    this.loadFSImage();
+    this.loadFSImage(target);
 
     // Do upgrade for each directory
     long oldCTime = storage.getCTime();
@@ -385,7 +343,7 @@ public class FSImage implements Closeabl
     storage.reportErrorsOnDirectories(errorSDs);
     errorSDs.clear();
 
-    saveFSImageInAllDirs(editLog.getLastWrittenTxId());
+    saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
 
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -422,7 +380,7 @@ public class FSImage implements Closeabl
     // a previous fs states in at least one of the storage directories.
     // Directories that don't have previous state do not rollback
     boolean canRollback = false;
-    FSImage prevState = new FSImage(conf, getFSNamesystem());
+    FSImage prevState = new FSImage(conf);
     prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -504,19 +462,32 @@ public class FSImage implements Closeabl
 
   /**
    * Load image from a checkpoint directory and save it into the current one.
+   * @param target the NameSystem to import into
    * @throws IOException
    */
-  void doImportCheckpoint() throws IOException {
-    FSNamesystem fsNamesys = getFSNamesystem();
-    FSImage ckptImage = new FSImage(conf, fsNamesys,
+  void doImportCheckpoint(FSNamesystem target) throws IOException {
+    Collection<URI> checkpointDirs =
+      FSImage.getCheckpointDirs(conf, null);
+    Collection<URI> checkpointEditsDirs =
+      FSImage.getCheckpointEditsDirs(conf, null);
+
+    if (checkpointDirs == null || checkpointDirs.isEmpty()) {
+      throw new IOException("Cannot import image from a checkpoint. "
+                            + "\"dfs.namenode.checkpoint.dir\" is not set." );
+    }
+    
+    if (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()) {
+      throw new IOException("Cannot import image from a checkpoint. "
+                            + "\"dfs.namenode.checkpoint.dir\" is not set." );
+    }
+
+    FSImage realImage = target.getFSImage();
+    FSImage ckptImage = new FSImage(conf, 
                                     checkpointDirs, checkpointEditsDirs);
-    // replace real image with the checkpoint image
-    FSImage realImage = fsNamesys.getFSImage();
-    assert realImage == this;
-    fsNamesys.dir.fsImage = ckptImage;
+    target.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
-      ckptImage.recoverTransitionRead(StartupOption.REGULAR);
+      ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
     } finally {
       ckptImage.close();
     }
@@ -524,10 +495,11 @@ public class FSImage implements Closeabl
     realImage.getStorage().setStorageInfo(ckptImage.getStorage());
     realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
 
-    fsNamesys.dir.fsImage = realImage;
+    target.dir.fsImage = realImage;
     realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
+
     // and save it but keep the same checkpointTime
-    saveNamespace();
+    saveNamespace(target);
     getStorage().writeAll();
   }
 
@@ -558,11 +530,11 @@ public class FSImage implements Closeabl
    * Toss the current image and namesystem, reloading from the specified
    * file.
    */
-  void reloadFromImageFile(File file) throws IOException {
-    namesystem.dir.reset();
+  void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
+    target.dir.reset();
 
     LOG.debug("Reloading namespace from " + file);
-    loadFSImage(file);
+    loadFSImage(file, target);
   }
 
   /**
@@ -580,36 +552,42 @@ public class FSImage implements Closeabl
    * @return whether the image should be saved
    * @throws IOException
    */
-  boolean loadFSImage() throws IOException {
+  boolean loadFSImage(FSNamesystem target) throws IOException {
     FSImageStorageInspector inspector = storage.readAndInspectDirs();
     
     isUpgradeFinalized = inspector.isUpgradeFinalized();
-    
+ 
+    FSImageStorageInspector.FSImageFile imageFile 
+      = inspector.getLatestImage();   
     boolean needToSave = inspector.needToSave();
-    
-    // Plan our load. This will throw if it's impossible to load from the
-    // data that's available.
-    LoadPlan loadPlan = inspector.createLoadPlan();    
-    LOG.debug("Planning to load image using following plan:\n" + loadPlan);
-
-    
-    // Recover from previous interrupted checkpoint, if any
-    needToSave |= loadPlan.doRecovery();
-
-    //
-    // Load in bits
-    //
-    StorageDirectory sdForProperties =
-      loadPlan.getStorageDirectoryForProperties();
-    storage.readProperties(sdForProperties);
-    File imageFile = loadPlan.getImageFile();
 
+    Iterable<EditLogInputStream> editStreams = null;
+
+    editLog.recoverUnclosedStreams();
+
+    if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
+                               getLayoutVersion())) {
+      editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
+                                               inspector.getMaxSeenTxId());
+    } else {
+      editStreams = FSImagePreTransactionalStorageInspector
+        .getEditLogStreams(storage);
+    }
+ 
+    LOG.debug("Planning to load image :\n" + imageFile);
+    for (EditLogInputStream l : editStreams) {
+      LOG.debug("\t Planning to load edit stream: " + l);
+    }
+    
     try {
+      StorageDirectory sdForProperties = imageFile.sd;
+      storage.readProperties(sdForProperties);
+
       if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
                                  getLayoutVersion())) {
         // For txid-based layout, we should have a .md5 file
         // next to the image file
-        loadFSImage(imageFile);
+        loadFSImage(imageFile.getFile(), target);
       } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
                                         getLayoutVersion())) {
         // In 0.22, we have the checksum stored in the VERSION file.
@@ -621,17 +599,19 @@ public class FSImage implements Closeabl
               NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
               " not set for storage directory " + sdForProperties.getRoot());
         }
-        loadFSImage(imageFile, new MD5Hash(md5));
+        loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
       } else {
         // We don't have any record of the md5sum
-        loadFSImage(imageFile, null);
+        loadFSImage(imageFile.getFile(), null, target);
       }
     } catch (IOException ioe) {
-      throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
+      FSEditLog.closeAllStreams(editStreams);
+      throw new IOException("Failed to load image from " + imageFile, ioe);
     }
     
-    long numLoaded = loadEdits(loadPlan.getEditsFiles());
-    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded);
+    long numLoaded = loadEdits(editStreams, target);
+    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
+                                                    numLoaded);
     
     // update the txid for the edit log
     editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
@@ -663,26 +643,30 @@ public class FSImage implements Closeabl
    * Load the specified list of edit files into the image.
    * @return the number of transactions loaded
    */
-  protected long loadEdits(List<File> editLogs) throws IOException {
-    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editLogs));
+  protected long loadEdits(Iterable<EditLogInputStream> editStreams,
+                           FSNamesystem target) throws IOException {
+    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
 
     long startingTxId = getLastAppliedTxId() + 1;
-    
-    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
     int numLoaded = 0;
-    // Load latest edits
-    for (File edits : editLogs) {
-      LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId);
-      EditLogFileInputStream editIn = new EditLogFileInputStream(edits);
-      int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
-      startingTxId += thisNumLoaded;
-      numLoaded += thisNumLoaded;
-      lastAppliedTxId += thisNumLoaded;
-      editIn.close();
+
+    try {    
+      FSEditLogLoader loader = new FSEditLogLoader(target);
+      
+      // Load latest edits
+      for (EditLogInputStream editIn : editStreams) {
+        LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
+        int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
+        startingTxId += thisNumLoaded;
+        numLoaded += thisNumLoaded;
+        lastAppliedTxId += thisNumLoaded;
+      }
+    } finally {
+      FSEditLog.closeAllStreams(editStreams);
     }
 
     // update the counts
-    getFSNamesystem().dir.updateCountForINodeWithQuota();    
+    target.dir.updateCountForINodeWithQuota();    
     return numLoaded;
   }
 
@@ -691,13 +675,14 @@ public class FSImage implements Closeabl
    * Load the image namespace from the given image file, verifying
    * it against the MD5 sum stored in its associated .md5 file.
    */
-  private void loadFSImage(File imageFile) throws IOException {
+  private void loadFSImage(File imageFile, FSNamesystem target)
+      throws IOException {
     MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
     if (expectedMD5 == null) {
       throw new IOException("No MD5 file found corresponding to image file "
           + imageFile);
     }
-    loadFSImage(imageFile, expectedMD5);
+    loadFSImage(imageFile, expectedMD5, target);
   }
   
   /**
@@ -705,11 +690,12 @@ public class FSImage implements Closeabl
    * filenames and blocks.  Return whether we should
    * "re-save" and consolidate the edit-logs
    */
-  private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
+  private void loadFSImage(File curFile, MD5Hash expectedMd5,
+      FSNamesystem target) throws IOException {
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
-        conf, getFSNamesystem());
+        conf, target);
     loader.load(curFile);
-    namesystem.setBlockPoolId(this.getBlockPoolID());
+    target.setBlockPoolId(this.getBlockPoolID());
 
     // Check that the image digest we loaded matches up with what
     // we expected
@@ -730,13 +716,14 @@ public class FSImage implements Closeabl
   /**
    * Save the contents of the FS image to the file.
    */
-  void saveFSImage(StorageDirectory sd, long txid) throws IOException {
+  void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
+      throws IOException {
     File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
     File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
     
     FSImageFormat.Saver saver = new FSImageFormat.Saver();
     FSImageCompression compression = FSImageCompression.createCompression(conf);
-    saver.save(newFile, txid, getFSNamesystem(), compression);
+    saver.save(newFile, txid, source, compression);
     
     MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
     storage.setMostRecentCheckpointInfo(txid, Util.now());
@@ -757,8 +744,11 @@ public class FSImage implements Closeabl
     private StorageDirectory sd;
     private List<StorageDirectory> errorSDs;
     private final long txid;
+    private final FSNamesystem source;
     
-    FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs, long txid) {
+    FSImageSaver(FSNamesystem source, StorageDirectory sd,
+        List<StorageDirectory> errorSDs, long txid) {
+      this.source = source;
       this.sd = sd;
       this.errorSDs = errorSDs;
       this.txid = txid;
@@ -766,7 +756,7 @@ public class FSImage implements Closeabl
     
     public void run() {
       try {
-        saveFSImage(sd, txid);
+        saveFSImage(source, sd, txid);
       } catch (Throwable t) {
         LOG.error("Unable to save image for " + sd.getRoot(), t);
         errorSDs.add(sd);
@@ -795,7 +785,7 @@ public class FSImage implements Closeabl
    * Save the contents of the FS image to a new image file in each of the
    * current storage directories.
    */
-  void saveNamespace() throws IOException {
+  void saveNamespace(FSNamesystem source) throws IOException {
     assert editLog != null : "editLog must be initialized";
     storage.attemptRestoreRemovedStorage();
 
@@ -806,7 +796,7 @@ public class FSImage implements Closeabl
     }
     long imageTxId = editLog.getLastWrittenTxId();
     try {
-      saveFSImageInAllDirs(imageTxId);
+      saveFSImageInAllDirs(source, imageTxId);
       storage.writeAll();
     } finally {
       if (editLogWasOpen) {
@@ -818,7 +808,8 @@ public class FSImage implements Closeabl
     
   }
   
-  protected void saveFSImageInAllDirs(long txid) throws IOException {
+  protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
+      throws IOException {
     if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
       throw new IOException("No image directories available!");
     }
@@ -831,7 +822,7 @@ public class FSImage implements Closeabl
     for (Iterator<StorageDirectory> it
            = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
-      FSImageSaver saver = new FSImageSaver(sd, errorSDs, txid);
+      FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
       Thread saveThread = new Thread(saver, saver.toString());
       saveThreads.add(saveThread);
       saveThread.start();

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Sun Feb 26 23:32:06 2012
@@ -556,8 +556,13 @@ class FSImageFormat {
       DataOutputStream out = new DataOutputStream(fos);
       try {
         out.writeInt(HdfsConstants.LAYOUT_VERSION);
-        out.writeInt(sourceNamesystem.getFSImage()
-                     .getStorage().getNamespaceID()); // TODO bad dependency
+        // We use the non-locked version of getNamespaceInfo here since
+        // the coordinating thread of saveNamespace already has read-locked
+        // the namespace for us. If we attempt to take another readlock
+        // from the actual saver thread, there's a potential of a
+        // fairness-related deadlock. See the comments on HDFS-2223.
+        out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
+            .getNamespaceID());
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
         out.writeLong(txid);

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java Sun Feb 26 23:32:06 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -55,6 +56,7 @@ class FSImagePreTransactionalStorageInsp
   private boolean hasOutOfDateStorageDirs = false;
   /* Flag set false if there are any "previous" directories found */
   private boolean isUpgradeFinalized = true;
+  private boolean needToSaveAfterRecovery = false;
   
   // Track the name and edits dir with the latest times
   private long latestNameCheckpointTime = Long.MIN_VALUE;
@@ -139,15 +141,15 @@ class FSImagePreTransactionalStorageInsp
   boolean isUpgradeFinalized() {
     return isUpgradeFinalized;
   }
-  
+    
   @Override
-  LoadPlan createLoadPlan() throws IOException {
+  FSImageFile getLatestImage() throws IOException {
     // We should have at least one image and one edits dirs
     if (latestNameSD == null)
       throw new IOException("Image file is not found in " + imageDirs);
     if (latestEditsSD == null)
       throw new IOException("Edits file is not found in " + editsDirs);
-
+    
     // Make sure we are loading image and edits from same checkpoint
     if (latestNameCheckpointTime > latestEditsCheckpointTime
         && latestNameSD != latestEditsSD
@@ -168,92 +170,70 @@ class FSImagePreTransactionalStorageInsp
                       "image checkpoint time = " + latestNameCheckpointTime +
                       "edits checkpoint time = " + latestEditsCheckpointTime);
     }
+
+    needToSaveAfterRecovery = doRecovery();
     
-    return new PreTransactionalLoadPlan();
+    return new FSImageFile(latestNameSD, 
+        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE),
+        HdfsConstants.INVALID_TXID);
   }
-  
+
   @Override
   boolean needToSave() {
     return hasOutOfDateStorageDirs ||
       checkpointTimes.size() != 1 ||
-      latestNameCheckpointTime > latestEditsCheckpointTime;
-
+      latestNameCheckpointTime > latestEditsCheckpointTime ||
+      needToSaveAfterRecovery;
   }
   
-  private class PreTransactionalLoadPlan extends LoadPlan {
-
-    @Override
-    boolean doRecovery() throws IOException {
-      LOG.debug(
+  boolean doRecovery() throws IOException {
+    LOG.debug(
         "Performing recovery in "+ latestNameSD + " and " + latestEditsSD);
       
-      boolean needToSave = false;
-      File curFile =
-        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
-      File ckptFile =
-        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
-
-      //
-      // If we were in the midst of a checkpoint
-      //
-      if (ckptFile.exists()) {
-        needToSave = true;
-        if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
-              .exists()) {
-          //
-          // checkpointing migth have uploaded a new
-          // merged image, but we discard it here because we are
-          // not sure whether the entire merged image was uploaded
-          // before the namenode crashed.
-          //
-          if (!ckptFile.delete()) {
-            throw new IOException("Unable to delete " + ckptFile);
-          }
-        } else {
-          //
-          // checkpointing was in progress when the namenode
-          // shutdown. The fsimage.ckpt was created and the edits.new
-          // file was moved to edits. We complete that checkpoint by
-          // moving fsimage.new to fsimage. There is no need to 
-          // update the fstime file here. renameTo fails on Windows
-          // if the destination file already exists.
-          //
+    boolean needToSave = false;
+    File curFile =
+      NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
+    File ckptFile =
+      NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
+    
+    //
+    // If we were in the midst of a checkpoint
+    //
+    if (ckptFile.exists()) {
+      needToSave = true;
+      if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
+          .exists()) {
+        //
+        // checkpointing migth have uploaded a new
+        // merged image, but we discard it here because we are
+        // not sure whether the entire merged image was uploaded
+        // before the namenode crashed.
+        //
+        if (!ckptFile.delete()) {
+          throw new IOException("Unable to delete " + ckptFile);
+        }
+      } else {
+        //
+        // checkpointing was in progress when the namenode
+        // shutdown. The fsimage.ckpt was created and the edits.new
+        // file was moved to edits. We complete that checkpoint by
+        // moving fsimage.new to fsimage. There is no need to 
+        // update the fstime file here. renameTo fails on Windows
+        // if the destination file already exists.
+        //
+        if (!ckptFile.renameTo(curFile)) {
+          if (!curFile.delete())
+            LOG.warn("Unable to delete dir " + curFile + " before rename");
           if (!ckptFile.renameTo(curFile)) {
-            if (!curFile.delete())
-              LOG.warn("Unable to delete dir " + curFile + " before rename");
-            if (!ckptFile.renameTo(curFile)) {
-              throw new IOException("Unable to rename " + ckptFile +
-                                    " to " + curFile);
-            }
+            throw new IOException("Unable to rename " + ckptFile +
+                                  " to " + curFile);
           }
         }
       }
-      return needToSave;
-    }
-
-    @Override
-    File getImageFile() {
-      return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
     }
-
-    @Override
-    List<File> getEditsFiles() {
-      if (latestNameCheckpointTime > latestEditsCheckpointTime) {
-        // the image is already current, discard edits
-        LOG.debug(
-          "Name checkpoint time is newer than edits, not loading edits.");
-        return Collections.<File>emptyList();
-      }
-      
-      return getEditsInStorageDir(latestEditsSD);
-    }
-
-    @Override
-    StorageDirectory getStorageDirectoryForProperties() {
-      return latestNameSD;
-    }    
+    return needToSave;
   }
-
+  
   /**
    * @return a list with the paths to EDITS and EDITS_NEW (if it exists)
    * in a given storage directory.
@@ -269,4 +249,33 @@ class FSImagePreTransactionalStorageInsp
     }
     return files;
   }
+  
+  private List<File> getLatestEditsFiles() {
+    if (latestNameCheckpointTime > latestEditsCheckpointTime) {
+      // the image is already current, discard edits
+      LOG.debug(
+          "Name checkpoint time is newer than edits, not loading edits.");
+      return Collections.<File>emptyList();
+    }
+    
+    return getEditsInStorageDir(latestEditsSD);
+  }
+  
+  @Override
+  long getMaxSeenTxId() {
+    return 0L;
+  }
+
+  static Iterable<EditLogInputStream> getEditLogStreams(NNStorage storage)
+      throws IOException {
+    FSImagePreTransactionalStorageInspector inspector 
+      = new FSImagePreTransactionalStorageInspector();
+    storage.inspectStorageDirs(inspector);
+
+    List<EditLogInputStream> editStreams = new ArrayList<EditLogInputStream>();
+    for (File f : inspector.getLatestEditsFiles()) {
+      editStreams.add(new EditLogFileInputStream(f));
+    }
+    return editStreams;
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java Sun Feb 26 23:32:06 2012
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -43,11 +44,16 @@ abstract class FSImageStorageInspector {
   abstract boolean isUpgradeFinalized();
   
   /**
-   * Create a plan to load the image from the set of inspected storage directories.
+   * Get the image files which should be loaded into the filesystem.
    * @throws IOException if not enough files are available (eg no image found in any directory)
    */
-  abstract LoadPlan createLoadPlan() throws IOException;
-  
+  abstract FSImageFile getLatestImage() throws IOException;
+
+  /** 
+   * Get the minimum tx id which should be loaded with this set of images.
+   */
+  abstract long getMaxSeenTxId();
+
   /**
    * @return true if the directories are in such a state that the image should be re-saved
    * following the load
@@ -55,49 +61,6 @@ abstract class FSImageStorageInspector {
   abstract boolean needToSave();
 
   /**
-   * A plan to load the namespace from disk, providing the locations from which to load
-   * the image and a set of edits files.
-   */
-  abstract static class LoadPlan {
-    /**
-     * Execute atomic move sequence in the chosen storage directories,
-     * in order to recover from an interrupted checkpoint.
-     * @return true if some recovery action was taken
-     */
-    abstract boolean doRecovery() throws IOException;
-
-    /**
-     * @return the file from which to load the image data
-     */
-    abstract File getImageFile();
-    
-    /**
-     * @return a list of flies containing edits to replay
-     */
-    abstract List<File> getEditsFiles();
-    
-    /**
-     * @return the storage directory containing the VERSION file that should be
-     * loaded.
-     */
-    abstract StorageDirectory getStorageDirectoryForProperties();
-    
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Will load image file: ").append(getImageFile()).append("\n");
-      sb.append("Will load edits files:").append("\n");
-      for (File f : getEditsFiles()) {
-        sb.append("  ").append(f).append("\n");
-      }
-      sb.append("Will load metadata from: ")
-        .append(getStorageDirectoryForProperties())
-        .append("\n");
-      return sb.toString();
-    }
-  }
-
-  /**
    * Record of an image that has been located and had its filename parsed.
    */
   static class FSImageFile {
@@ -106,7 +69,8 @@ abstract class FSImageStorageInspector {
     private final File file;
     
     FSImageFile(StorageDirectory sd, File file, long txId) {
-      assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+      assert txId >= 0 || txId == HdfsConstants.INVALID_TXID 
+        : "Invalid txid on " + file +": " + txId;
       
       this.sd = sd;
       this.txId = txId;

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Sun Feb 26 23:32:06 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -52,9 +51,7 @@ class FSImageTransactionalStorageInspect
   private boolean isUpgradeFinalized = true;
   
   List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
-  List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
-  SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
-  long maxSeenTxId = 0;
+  private long maxSeenTxId = 0;
   
   private static final Pattern IMAGE_REGEX = Pattern.compile(
     NameNodeFile.IMAGE.getName() + "_(\\d+)");
@@ -68,6 +65,8 @@ class FSImageTransactionalStorageInspect
       return;
     }
     
+    maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+
     File currentDir = sd.getCurrentDir();
     File filesInStorage[];
     try {
@@ -110,34 +109,10 @@ class FSImageTransactionalStorageInspect
       LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
     }
     
-    List<EditLogFile> editLogs 
-      = FileJournalManager.matchEditLogs(filesInStorage);
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      for (EditLogFile log : editLogs) {
-        addEditLog(log);
-      }
-    } else if (!editLogs.isEmpty()){
-      LOG.warn("Found the following edit log file(s) in " + sd +
-          " even though it was not configured to store edits:\n" +
-          "  " + Joiner.on("\n  ").join(editLogs));
-          
-    }
-    
     // set finalized flag
     isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
   }
 
-  private void addEditLog(EditLogFile foundEditLog) {
-    foundEditLogs.add(foundEditLog);
-    LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
-    if (group == null) {
-      group = new LogGroup(foundEditLog.getFirstTxId());
-      logGroups.put(foundEditLog.getFirstTxId(), group);
-    }
-    group.add(foundEditLog);
-  }
-
-
   @Override
   public boolean isUpgradeFinalized() {
     return isUpgradeFinalized;
@@ -148,9 +123,13 @@ class FSImageTransactionalStorageInspect
    * If there are multiple storage directories which contain equal images 
    * the storage directory that was inspected first will be preferred.
    * 
-   * Returns null if no images were found.
+   * @throws FileNotFoundException if not images are found.
    */
-  FSImageFile getLatestImage() {
+  FSImageFile getLatestImage() throws IOException {
+    if (foundImages.isEmpty()) {
+      throw new FileNotFoundException("No valid image files found");
+    }
+
     FSImageFile ret = null;
     for (FSImageFile img : foundImages) {
       if (ret == null || img.txId > ret.txId) {
@@ -164,349 +143,13 @@ class FSImageTransactionalStorageInspect
     return ImmutableList.copyOf(foundImages);
   }
   
-  public List<EditLogFile> getEditLogFiles() {
-    return ImmutableList.copyOf(foundEditLogs);
-  }
-
-  @Override
-  public LoadPlan createLoadPlan() throws IOException {
-    if (foundImages.isEmpty()) {
-      throw new FileNotFoundException("No valid image files found");
-    }
-
-    FSImageFile recoveryImage = getLatestImage();
-    LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
-
-    return new TransactionalLoadPlan(recoveryImage,
-        logPlan);
-  }
-  
-  /**
-   * Plan which logs to load in order to bring the namespace up-to-date.
-   * Transactions will be considered in the range (sinceTxId, maxTxId]
-   * 
-   * @param sinceTxId the highest txid that is already loaded 
-   *                  (eg from the image checkpoint)
-   * @param maxStartTxId ignore any log files that start after this txid
-   */
-  LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
-    long expectedTxId = sinceTxId + 1;
-    
-    List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
-    
-    SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
-    if (logGroups.size() > tailGroups.size()) {
-      LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + 
-          " groups of logs because they start with a txid less than image " +
-          "txid " + sinceTxId);
-    }
-    
-    SortedMap<Long, LogGroup> usefulGroups;
-    if (maxStartTxId > sinceTxId) {
-      usefulGroups = tailGroups.headMap(maxStartTxId);
-    } else {
-      usefulGroups = new TreeMap<Long, LogGroup>();
-    }
-    
-    if (usefulGroups.size() > tailGroups.size()) {
-      LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + 
-        " groups of logs because they start with a txid higher than max " +
-        "txid " + sinceTxId);
-    }
-
-
-    for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
-      long logStartTxId = entry.getKey();
-      LogGroup logGroup = entry.getValue();
-      
-      logGroup.planRecovery();
-      
-      if (expectedTxId != HdfsConstants.INVALID_TXID && logStartTxId != expectedTxId) {
-        throw new IOException("Expected next log group would start at txid " +
-            expectedTxId + " but starts at txid " + logStartTxId);
-      }
-      
-      // We can pick any of the non-corrupt logs here
-      recoveryLogs.add(logGroup.getBestNonCorruptLog());
-      
-      // If this log group was finalized, we know to expect the next
-      // log group to start at the following txid (ie no gaps)
-      if (logGroup.hasKnownLastTxId()) {
-        expectedTxId = logGroup.getLastTxId() + 1;
-      } else {
-        // the log group was in-progress so we don't know what ID
-        // the next group should start from.
-        expectedTxId = HdfsConstants.INVALID_TXID;
-      }
-    }
-    
-    long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
-        0 : usefulGroups.lastKey();
-    if (maxSeenTxId > sinceTxId &&
-        maxSeenTxId > lastLogGroupStartTxId) {
-      String msg = "At least one storage directory indicated it has seen a " +
-        "log segment starting at txid " + maxSeenTxId;
-      if (usefulGroups.isEmpty()) {
-        msg += " but there are no logs to load.";
-      } else {
-        msg += " but the most recent log file found starts with txid " +
-          lastLogGroupStartTxId;
-      }
-      throw new IOException(msg);
-    }
-    
-    return new LogLoadPlan(recoveryLogs,
-        Lists.newArrayList(usefulGroups.values()));
-
-  }
-
   @Override
   public boolean needToSave() {
     return needToSave;
   }
-  
-  /**
-   * A group of logs that all start at the same txid.
-   * 
-   * Handles determining which logs are corrupt and which should be considered
-   * candidates for loading.
-   */
-  static class LogGroup {
-    long startTxId;
-    List<EditLogFile> logs = new ArrayList<EditLogFile>();;
-    private Set<Long> endTxIds = new TreeSet<Long>();
-    private boolean hasInProgress = false;
-    private boolean hasFinalized = false;
-        
-    LogGroup(long startTxId) {
-      this.startTxId = startTxId;
-    }
-    
-    EditLogFile getBestNonCorruptLog() {
-      // First look for non-corrupt finalized logs
-      for (EditLogFile log : logs) {
-        if (!log.isCorrupt() && !log.isInProgress()) {
-          return log;
-        }
-      }
-      // Then look for non-corrupt in-progress logs
-      for (EditLogFile log : logs) {
-        if (!log.isCorrupt()) {
-          return log;
-        }
-      }
 
-      // We should never get here, because we don't get to the planning stage
-      // without calling planRecovery first, and if we've called planRecovery,
-      // we would have already thrown if there were no non-corrupt logs!
-      throw new IllegalStateException(
-        "No non-corrupt logs for txid " + startTxId);
-    }
-
-    /**
-     * @return true if we can determine the last txid in this log group.
-     */
-    boolean hasKnownLastTxId() {
-      for (EditLogFile log : logs) {
-        if (!log.isInProgress()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /**
-     * @return the last txid included in the logs in this group
-     * @throws IllegalStateException if it is unknown -
-     *                               {@see #hasKnownLastTxId()}
-     */
-    long getLastTxId() {
-      for (EditLogFile log : logs) {
-        if (!log.isInProgress()) {
-          return log.getLastTxId();
-        }
-      }
-      throw new IllegalStateException("LogGroup only has in-progress logs");
-    }
-
-    
-    void add(EditLogFile log) {
-      assert log.getFirstTxId() == startTxId;
-      logs.add(log);
-      
-      if (log.isInProgress()) {
-        hasInProgress = true;
-      } else {
-        hasFinalized = true;
-        endTxIds.add(log.getLastTxId());
-      }
-    }
-    
-    void planRecovery() throws IOException {
-      assert hasInProgress || hasFinalized;
-      
-      checkConsistentEndTxIds();
-        
-      if (hasFinalized && hasInProgress) {
-        planMixedLogRecovery();
-      } else if (!hasFinalized && hasInProgress) {
-        planAllInProgressRecovery();
-      } else if (hasFinalized && !hasInProgress) {
-        LOG.debug("No recovery necessary for logs starting at txid " +
-                  startTxId);
-      }
-    }
-
-    /**
-     * Recovery case for when some logs in the group were in-progress, and
-     * others were finalized. This happens when one of the storage
-     * directories fails.
-     *
-     * The in-progress logs in this case should be considered corrupt.
-     */
-    private void planMixedLogRecovery() throws IOException {
-      for (EditLogFile log : logs) {
-        if (log.isInProgress()) {
-          LOG.warn("Log at " + log.getFile() + " is in progress, but " +
-                   "other logs starting at the same txid " + startTxId +
-                   " are finalized. Moving aside.");
-          log.markCorrupt();
-        }
-      }
-    }
-    
-    /**
-     * Recovery case for when all of the logs in the group were in progress.
-     * This happens if the NN completely crashes and restarts. In this case
-     * we check the non-zero lengths of each log file, and any logs that are
-     * less than the max of these lengths are considered corrupt.
-     */
-    private void planAllInProgressRecovery() throws IOException {
-      // We only have in-progress logs. We need to figure out which logs have
-      // the latest data to reccover them
-      LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
-               "in-progress (probably truncated due to a previous NameNode " +
-               "crash)");
-      if (logs.size() == 1) {
-        // Only one log, it's our only choice!
-        EditLogFile log = logs.get(0);
-        if (log.validateLog().numTransactions == 0) {
-          // If it has no transactions, we should consider it corrupt just
-          // to be conservative.
-          // See comment below for similar case
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it has no transactions in it.");
-          log.markCorrupt();          
-        }
-        return;
-      }
-
-      long maxValidTxnCount = Long.MIN_VALUE;
-      for (EditLogFile log : logs) {
-        long validTxnCount = log.validateLog().numTransactions;
-        LOG.warn("  Log " + log.getFile() +
-            " valid txns=" + validTxnCount +
-            " valid len=" + log.validateLog().validLength);
-        maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
-      }        
-
-      for (EditLogFile log : logs) {
-        long txns = log.validateLog().numTransactions;
-        if (txns < maxValidTxnCount) {
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-                   "it is has only " + txns + " valid txns whereas another " +
-                   "log has " + maxValidTxnCount);
-          log.markCorrupt();
-        } else if (txns == 0) {
-          // this can happen if the NN crashes right after rolling a log
-          // but before the START_LOG_SEGMENT txn is written. Since the log
-          // is empty, we can just move it aside to its corrupt name.
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it has no transactions in it.");
-          log.markCorrupt();
-        }
-      }
-    }
-
-    /**
-     * Check for the case when we have multiple finalized logs and they have
-     * different ending transaction IDs. This violates an invariant that all
-     * log directories should roll together. We should abort in this case.
-     */
-    private void checkConsistentEndTxIds() throws IOException {
-      if (hasFinalized && endTxIds.size() > 1) {
-        throw new IOException("More than one ending txid was found " +
-            "for logs starting at txid " + startTxId + ". " +
-            "Found: " + StringUtils.join(endTxIds, ','));
-      }
-    }
-
-    void recover() throws IOException {
-      for (EditLogFile log : logs) {
-        if (log.isCorrupt()) {
-          log.moveAsideCorruptFile();
-        } else if (log.isInProgress()) {
-          log.finalizeLog();
-        }
-      }
-    }    
-  }
-  
-  static class TransactionalLoadPlan extends LoadPlan {
-    final FSImageFile image;
-    final LogLoadPlan logPlan;
-    
-    public TransactionalLoadPlan(FSImageFile image,
-        LogLoadPlan logPlan) {
-      super();
-      this.image = image;
-      this.logPlan = logPlan;
-    }
-
-    @Override
-    boolean doRecovery() throws IOException {
-      logPlan.doRecovery();
-      return false;
-    }
-
-    @Override
-    File getImageFile() {
-      return image.getFile();
-    }
-
-    @Override
-    List<File> getEditsFiles() {
-      return logPlan.getEditsFiles();
-    }
-
-    @Override
-    StorageDirectory getStorageDirectoryForProperties() {
-      return image.sd;
-    }
-  }
-  
-  static class LogLoadPlan {
-    final List<EditLogFile> editLogs;
-    final List<LogGroup> logGroupsToRecover;
-    
-    LogLoadPlan(List<EditLogFile> editLogs,
-        List<LogGroup> logGroupsToRecover) {
-      this.editLogs = editLogs;
-      this.logGroupsToRecover = logGroupsToRecover;
-    }
-
-    public void doRecovery() throws IOException {
-      for (LogGroup g : logGroupsToRecover) {
-        g.recover();
-      }
-    }
-
-    public List<File> getEditsFiles() {
-      List<File> ret = new ArrayList<File>();
-      for (EditLogFile log : editLogs) {
-        ret.add(log.getFile());
-      }
-      return ret;
-    }
+  @Override
+  long getMaxSeenTxId() {
+    return maxSeenTxId;
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Feb 26 23:32:06 2012
@@ -173,6 +173,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 
 /***************************************************
@@ -297,12 +298,43 @@ public class FSNamesystem implements Nam
   // lock to protect FSNamesystem.
   private ReentrantReadWriteLock fsLock;
 
+  
+  /**
+   * Instantiates an FSNamesystem loaded from the image and edits
+   * directories specified in the passed Configuration.
+   * 
+   * @param conf the Configuration which specifies the storage directories
+   *             from which to load
+   * @return an FSNamesystem which contains the loaded namespace
+   * @throws IOException if loading fails
+   */
+  public static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
+    FSImage fsImage = new FSImage(conf);
+    FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
+
+    long loadStart = now();
+    StartupOption startOpt = NameNode.getStartupOption(conf);
+    namesystem.loadFSImage(startOpt, fsImage);
+    long timeTakenToLoadFSImage = now() - loadStart;
+    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
+    NameNode.getNameNodeMetrics().setFsImageLoadTime(
+                              (int) timeTakenToLoadFSImage);
+    return namesystem;
+  }
+
   /**
-   * FSNamesystem constructor.
+   * Create an FSNamesystem associated with the specified image.
+   * 
+   * Note that this does not load any data off of disk -- if you would
+   * like that behavior, use {@link #loadFromDisk(Configuration)}
+
+   * @param fnImage The FSImage to associate with
+   * @param conf configuration
+   * @throws IOException on bad configuration
    */
-  FSNamesystem(Configuration conf) throws IOException {
+  FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
     try {
-      initialize(conf, null);
+      initialize(conf, fsImage);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -318,29 +350,41 @@ public class FSNamesystem implements Nam
     resourceRecheckInterval = conf.getLong(
         DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
         DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
-    nnResourceChecker = new NameNodeResourceChecker(conf);
-    checkAvailableResources();
     this.systemStart = now();
     this.blockManager = new BlockManager(this, this, conf);
     this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
     this.fsLock = new ReentrantReadWriteLock(true); // fair locking
     setConfigurationParameters(conf);
     dtSecretManager = createDelegationTokenSecretManager(conf);
-    this.registerMBean(); // register the MBean for the FSNamesystemState
-    if(fsImage == null) {
-      this.dir = new FSDirectory(this, conf);
-      StartupOption startOpt = NameNode.getStartupOption(conf);
-      this.dir.loadFSImage(startOpt);
-      long timeTakenToLoadFSImage = now() - systemStart;
-      LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
-      NameNode.getNameNodeMetrics().setFsImageLoadTime(
-                                (int) timeTakenToLoadFSImage);
-    } else {
-      this.dir = new FSDirectory(fsImage, this, conf);
-    }
+    this.dir = new FSDirectory(fsImage, this, conf);
     this.safeMode = new SafeModeInfo(conf);
   }
 
+  void loadFSImage(StartupOption startOpt, FSImage fsImage)
+      throws IOException {
+    // format before starting up if requested
+    if (startOpt == StartupOption.FORMAT) {
+      
+      fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
+
+      startOpt = StartupOption.REGULAR;
+    }
+    boolean success = false;
+    try {
+      if (fsImage.recoverTransitionRead(startOpt, this)) {
+        fsImage.saveNamespace(this);
+      }
+      fsImage.openEditLog();
+      
+      success = true;
+    } finally {
+      if (!success) {
+        fsImage.close();
+      }
+    }
+    dir.imageLoadComplete();
+  }
+
   void activateSecretManager() throws IOException {
     if (dtSecretManager != null) {
       dtSecretManager.startThreads();
@@ -351,8 +395,13 @@ public class FSNamesystem implements Nam
    * Activate FSNamesystem daemons.
    */
   void activate(Configuration conf) throws IOException {
+    this.registerMBean(); // register the MBean for the FSNamesystemState
+
     writeLock();
     try {
+      nnResourceChecker = new NameNodeResourceChecker(conf);
+      checkAvailableResources();
+
       setBlockTotal();
       blockManager.activate(conf);
 
@@ -436,37 +485,6 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   * dirs is a list of directories where the filesystem directory state 
-   * is stored
-   */
-  FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
-    this.fsLock = new ReentrantReadWriteLock(true);
-    this.blockManager = new BlockManager(this, this, conf);
-    setConfigurationParameters(conf);
-    this.dir = new FSDirectory(fsImage, this, conf);
-    dtSecretManager = createDelegationTokenSecretManager(conf);
-  }
-
-  /**
-   * Create FSNamesystem for {@link BackupNode}.
-   * Should do everything that would be done for the NameNode,
-   * except for loading the image.
-   * 
-   * @param bnImage {@link BackupImage}
-   * @param conf configuration
-   * @throws IOException
-   */
-  FSNamesystem(Configuration conf, BackupImage bnImage) throws IOException {
-    try {
-      initialize(conf, bnImage);
-    } catch(IOException e) {
-      LOG.error(getClass().getSimpleName() + " initialization failed.", e);
-      close();
-      throw e;
-    }
-  }
-
-  /**
    * Initializes some of the members from configuration
    */
   private void setConfigurationParameters(Configuration conf) 
@@ -514,16 +532,23 @@ public class FSNamesystem implements Nam
   NamespaceInfo getNamespaceInfo() {
     readLock();
     try {
-      return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
-          getClusterId(), getBlockPoolId(),
-          dir.fsImage.getStorage().getCTime(),
-          upgradeManager.getUpgradeVersion());
+      return unprotectedGetNamespaceInfo();
     } finally {
       readUnlock();
     }
   }
 
   /**
+   * Version of {@see #getNamespaceInfo()} that is not protected by a lock.
+   */
+  NamespaceInfo unprotectedGetNamespaceInfo() {
+    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+        getClusterId(), getBlockPoolId(),
+        dir.fsImage.getStorage().getCTime(),
+        upgradeManager.getUpgradeVersion());
+  }
+
+  /**
    * Close down this file system manager.
    * Causes heartbeat and lease daemons to stop; waits briefly for
    * them to finish, but a short timeout returns control back to caller.
@@ -2567,6 +2592,8 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   private void checkAvailableResources() throws IOException {
+    Preconditions.checkState(nnResourceChecker != null,
+        "nnResourceChecker not initialized");
     hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
   }
 
@@ -2752,7 +2779,7 @@ public class FSNamesystem implements Nam
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
       }
-      getFSImage().saveNamespace();
+      getFSImage().saveNamespace(this);
       LOG.info("New namespace image has been created.");
     } finally {
       readUnlock();

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Sun Feb 26 23:32:06 2012
@@ -80,7 +80,7 @@ public class FileChecksumServlets {
           dtParam + addrParam);
     }
 
-    /** {@inheritDoc} */
+    @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response
         ) throws ServletException, IOException {
       final ServletContext context = getServletContext();
@@ -104,7 +104,7 @@ public class FileChecksumServlets {
     /** For java.io.Serializable */
     private static final long serialVersionUID = 1L;
     
-    /** {@inheritDoc} */
+    @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response
         ) throws ServletException, IOException {
       final PrintWriter out = response.getWriter();

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Sun Feb 26 23:32:06 2012
@@ -23,11 +23,14 @@ import org.apache.commons.logging.LogFac
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Comparator;
+import java.util.Collections;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
@@ -57,6 +60,9 @@ class FileJournalManager implements Jour
   private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
     NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
 
+  private File currentInProgress = null;
+  private long maxSeenTransaction = 0L;
+
   @VisibleForTesting
   StoragePurger purger
     = new NNStorageRetentionManager.DeletionStoragePurger();
@@ -66,19 +72,20 @@ class FileJournalManager implements Jour
   }
 
   @Override
-  public EditLogOutputStream startLogSegment(long txid) throws IOException {    
-    File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
-    EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
+  synchronized public EditLogOutputStream startLogSegment(long txid) 
+      throws IOException {
+    currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+    EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
         outputBufferCapacity);
     stm.create();
     return stm;
   }
 
   @Override
-  public void finalizeLogSegment(long firstTxId, long lastTxId)
+  synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
       throws IOException {
-    File inprogressFile = NNStorage.getInProgressEditsFile(
-        sd, firstTxId);
+    File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
+
     File dstFile = NNStorage.getFinalizedEditsFile(
         sd, firstTxId, lastTxId);
     LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
@@ -89,6 +96,9 @@ class FileJournalManager implements Jour
     if (!inprogressFile.renameTo(dstFile)) {
       throw new IOException("Unable to finalize edits file " + inprogressFile);
     }
+    if (inprogressFile.equals(currentInProgress)) {
+      currentInProgress = null;
+    }
   }
 
   @VisibleForTesting
@@ -97,12 +107,7 @@ class FileJournalManager implements Jour
   }
 
   @Override
-  public String toString() {
-    return "FileJournalManager for storage directory " + sd;
-  }
-
-  @Override
-  public void setOutputBufferCapacity(int size) {
+  synchronized public void setOutputBufferCapacity(int size) {
     this.outputBufferCapacity = size;
   }
 
@@ -120,13 +125,6 @@ class FileJournalManager implements Jour
     }
   }
 
-  @Override
-  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
-      throws IOException {
-    File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
-    return new EditLogFileInputStream(f);
-  }
-  
   /**
    * Find all editlog segments starting at or above the given txid.
    * @param fromTxId the txnid which to start looking
@@ -178,17 +176,156 @@ class FileJournalManager implements Jour
         try {
           long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
           ret.add(
-            new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END));
+              new EditLogFile(f, startTxId, startTxId, true));
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
           // skip
-        }          
+        }
       }
     }
     return ret;
   }
 
+  @Override
+  synchronized public EditLogInputStream getInputStream(long fromTxId) 
+      throws IOException {
+    for (EditLogFile elf : getLogFiles(fromTxId)) {
+      if (elf.getFirstTxId() == fromTxId) {
+        if (elf.isInProgress()) {
+          elf.validateLog();
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Returning edit stream reading from " + elf);
+        }
+        return new EditLogFileInputStream(elf.getFile(), 
+            elf.getFirstTxId(), elf.getLastTxId());
+      }
+    }
+
+    throw new IOException("Cannot find editlog file with " + fromTxId
+        + " as first first txid");
+  }
+
+  @Override
+  public long getNumberOfTransactions(long fromTxId) 
+      throws IOException, CorruptionException {
+    long numTxns = 0L;
+    
+    for (EditLogFile elf : getLogFiles(fromTxId)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Counting " + elf);
+      }
+      if (elf.getFirstTxId() > fromTxId) { // there must be a gap
+        LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+            + fromTxId + " - " + (elf.getFirstTxId() - 1));
+        break;
+      } else if (fromTxId == elf.getFirstTxId()) {
+        if (elf.isInProgress()) {
+          elf.validateLog();
+        } 
+
+        if (elf.isCorrupt()) {
+          break;
+        }
+        fromTxId = elf.getLastTxId() + 1;
+        numTxns += fromTxId - elf.getFirstTxId();
+        
+        if (elf.isInProgress()) {
+          break;
+        }
+      } // else skip
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Journal " + this + " has " + numTxns 
+                + " txns from " + fromTxId);
+    }
+
+    long max = findMaxTransaction();
+    // fromTxId should be greater than max, as it points to the next 
+    // transaction we should expect to find. If it is less than or equal
+    // to max, it means that a transaction with txid == max has not been found
+    if (numTxns == 0 && fromTxId <= max) { 
+      String error = String.format("Gap in transactions, max txnid is %d"
+                                   + ", 0 txns from %d", max, fromTxId);
+      LOG.error(error);
+      throw new CorruptionException(error);
+    }
+
+    return numTxns;
+  }
+
+  @Override
+  synchronized public void recoverUnfinalizedSegments() throws IOException {
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    
+    // make sure journal is aware of max seen transaction before moving corrupt 
+    // files aside
+    findMaxTransaction();
+
+    for (EditLogFile elf : allLogFiles) {
+      if (elf.getFile().equals(currentInProgress)) {
+        continue;
+      }
+      if (elf.isInProgress()) {
+        elf.validateLog();
+
+        if (elf.isCorrupt()) {
+          elf.moveAsideCorruptFile();
+          continue;
+        }
+        finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
+      }
+    }
+  }
+
+  private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    List<EditLogFile> logFiles = Lists.newArrayList();
+    
+    for (EditLogFile elf : allLogFiles) {
+      if (fromTxId > elf.getFirstTxId()
+          && fromTxId <= elf.getLastTxId()) {
+        throw new IOException("Asked for fromTxId " + fromTxId
+            + " which is in middle of file " + elf.file);
+      }
+      if (fromTxId <= elf.getFirstTxId()) {
+        logFiles.add(elf);
+      }
+    }
+    
+    Collections.sort(logFiles, EditLogFile.COMPARE_BY_START_TXID);
+
+    return logFiles;
+  }
+
+  /** 
+   * Find the maximum transaction in the journal.
+   * This gets stored in a member variable, as corrupt edit logs
+   * will be moved aside, but we still need to remember their first
+   * tranaction id in the case that it was the maximum transaction in
+   * the journal.
+   */
+  private long findMaxTransaction()
+      throws IOException {
+    for (EditLogFile elf : getLogFiles(0)) {
+      if (elf.isInProgress()) {
+        maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
+        elf.validateLog();
+      }
+      maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
+    }
+    return maxSeenTransaction;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FileJournalManager(root=%s)", sd.getRoot());
+  }
+
   /**
    * Record of an edit log that has been located and had its filename parsed.
    */
@@ -196,12 +333,10 @@ class FileJournalManager implements Jour
     private File file;
     private final long firstTxId;
     private long lastTxId;
-    
-    private EditLogValidation cachedValidation = null;
+
     private boolean isCorrupt = false;
-    
-    static final long UNKNOWN_END = -1;
-    
+    private final boolean isInProgress;
+
     final static Comparator<EditLogFile> COMPARE_BY_START_TXID 
       = new Comparator<EditLogFile>() {
       public int compare(EditLogFile a, EditLogFile b) {
@@ -214,30 +349,24 @@ class FileJournalManager implements Jour
 
     EditLogFile(File file,
         long firstTxId, long lastTxId) {
-      assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId;
-      assert firstTxId > 0;
+      this(file, firstTxId, lastTxId, false);
+      assert (lastTxId != HdfsConstants.INVALID_TXID)
+        && (lastTxId >= firstTxId);
+    }
+    
+    EditLogFile(File file, long firstTxId, 
+                long lastTxId, boolean isInProgress) { 
+      assert (lastTxId == HdfsConstants.INVALID_TXID && isInProgress)
+        || (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId);
+      assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
       assert file != null;
       
       this.firstTxId = firstTxId;
       this.lastTxId = lastTxId;
       this.file = file;
+      this.isInProgress = isInProgress;
     }
     
-    public void finalizeLog() throws IOException {
-      long numTransactions = validateLog().numTransactions;
-      long lastTxId = firstTxId + numTransactions - 1;
-      File dst = new File(file.getParentFile(),
-          NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId));
-      LOG.info("Finalizing edits log " + file + " by renaming to "
-          + dst.getName());
-      if (!file.renameTo(dst)) {
-        throw new IOException("Couldn't finalize log " +
-            file + " to " + dst);
-      }
-      this.lastTxId = lastTxId;
-      file = dst;
-    }
-
     long getFirstTxId() {
       return firstTxId;
     }
@@ -246,15 +375,22 @@ class FileJournalManager implements Jour
       return lastTxId;
     }
 
-    EditLogValidation validateLog() throws IOException {
-      if (cachedValidation == null) {
-        cachedValidation = EditLogFileInputStream.validateEditLog(file);
+    /** 
+     * Count the number of valid transactions in a log.
+     * This will update the lastTxId of the EditLogFile or
+     * mark it as corrupt if it is.
+     */
+    void validateLog() throws IOException {
+      EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
+      if (val.getNumTransactions() == 0) {
+        markCorrupt();
+      } else {
+        this.lastTxId = val.getEndTxId();
       }
-      return cachedValidation;
     }
 
     boolean isInProgress() {
-      return (lastTxId == UNKNOWN_END);
+      return isInProgress;
     }
 
     File getFile() {

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Sun Feb 26 23:32:06 2012
@@ -261,13 +261,13 @@ public abstract class INode implements C
     this.name = name;
   }
 
-  /** {@inheritDoc} */
+  @Override
   public String getFullPathName() {
     // Get the full path name of this inode.
     return FSDirectory.getFullPathName(this);
   }
 
-  /** {@inheritDoc} */
+  @Override
   public String toString() {
     return "\"" + getFullPathName() + "\":"
     + getUserName() + ":" + getGroupName() + ":"

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Sun Feb 26 23:32:06 2012
@@ -372,7 +372,7 @@ class INodeDirectory extends INode {
     return parent;
   }
 
-  /** {@inheritDoc} */
+  @Override
   DirCounts spaceConsumedInTree(DirCounts counts) {
     counts.nsCount += 1;
     if (children != null) {
@@ -383,7 +383,7 @@ class INodeDirectory extends INode {
     return counts;    
   }
 
-  /** {@inheritDoc} */
+  @Override
   long[] computeContentSummary(long[] summary) {
     // Walk through the children of this node, using a new summary array
     // for the (sub)tree rooted at this node

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sun Feb 26 23:32:06 2012
@@ -168,7 +168,7 @@ public class INodeFile extends INode {
     return 1;
   }
 
-  /** {@inheritDoc} */
+  @Override
   long[] computeContentSummary(long[] summary) {
     summary[0] += computeFileSize(true);
     summary[1]++;