You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 18:28:51 UTC

svn commit: r1152295 [4/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/j...

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,688 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
+  public static final Log LOG = LogFactory.getLog(
+    FSImageTransactionalStorageInspector.class);
+
+  private boolean needToSave = false;
+  private boolean isUpgradeFinalized = true;
+  
+  List<FoundFSImage> foundImages = new ArrayList<FoundFSImage>();
+  List<FoundEditLog> foundEditLogs = new ArrayList<FoundEditLog>();
+  SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
+  long maxSeenTxId = 0;
+  
+  private static final Pattern IMAGE_REGEX = Pattern.compile(
+    NameNodeFile.IMAGE.getName() + "_(\\d+)");
+  private static final Pattern EDITS_REGEX = Pattern.compile(
+    NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
+  private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
+    NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+
+  @Override
+  public void inspectDirectory(StorageDirectory sd) throws IOException {
+    // Was the directory just formatted?
+    if (!sd.getVersionFile().exists()) {
+      LOG.info("No version file in " + sd.getRoot());
+      needToSave |= true;
+      return;
+    }
+    
+    File currentDir = sd.getCurrentDir();
+    File filesInStorage[];
+    try {
+      filesInStorage = FileUtil.listFiles(currentDir);
+    } catch (IOException ioe) {
+      LOG.warn("Unable to inspect storage directory " + currentDir,
+          ioe);
+      return;
+    }
+
+    for (File f : filesInStorage) {
+      LOG.debug("Checking file " + f);
+      String name = f.getName();
+      
+      // Check for fsimage_*
+      Matcher imageMatch = IMAGE_REGEX.matcher(name);
+      if (imageMatch.matches()) {
+        if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
+          try {
+            long txid = Long.valueOf(imageMatch.group(1));
+            foundImages.add(new FoundFSImage(sd, f, txid));
+          } catch (NumberFormatException nfe) {
+            LOG.error("Image file " + f + " has improperly formatted " +
+                      "transaction ID");
+            // skip
+          }
+        } else {
+          LOG.warn("Found image file at " + f + " but storage directory is " +
+                   "not configured to contain images.");
+        }
+      }
+    }
+    
+
+    // Check for a seen_txid file, which marks a minimum transaction ID that
+    // must be included in our load plan.
+    try {
+      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    } catch (IOException ioe) {
+      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
+    }
+    
+    List<FoundEditLog> editLogs = matchEditLogs(filesInStorage);
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      for (FoundEditLog log : editLogs) {
+        addEditLog(log);
+      }
+    } else if (!editLogs.isEmpty()){
+      LOG.warn("Found the following edit log file(s) in " + sd +
+          " even though it was not configured to store edits:\n" +
+          "  " + Joiner.on("\n  ").join(editLogs));
+          
+    }
+    
+    // set finalized flag
+    isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
+  }
+
+  static List<FoundEditLog> matchEditLogs(File[] filesInStorage) {
+    List<FoundEditLog> ret = Lists.newArrayList();
+    for (File f : filesInStorage) {
+      String name = f.getName();
+      // Check for edits
+      Matcher editsMatch = EDITS_REGEX.matcher(name);
+      if (editsMatch.matches()) {
+        try {
+          long startTxId = Long.valueOf(editsMatch.group(1));
+          long endTxId = Long.valueOf(editsMatch.group(2));
+          ret.add(new FoundEditLog(f, startTxId, endTxId));
+        } catch (NumberFormatException nfe) {
+          LOG.error("Edits file " + f + " has improperly formatted " +
+                    "transaction ID");
+          // skip
+        }          
+      }
+      
+      // Check for in-progress edits
+      Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);
+      if (inProgressEditsMatch.matches()) {
+        try {
+          long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
+          ret.add(
+            new FoundEditLog(f, startTxId, FoundEditLog.UNKNOWN_END));
+        } catch (NumberFormatException nfe) {
+          LOG.error("In-progress edits file " + f + " has improperly " +
+                    "formatted transaction ID");
+          // skip
+        }          
+      }
+    }
+    return ret;
+  }
+
+  private void addEditLog(FoundEditLog foundEditLog) {
+    foundEditLogs.add(foundEditLog);
+    LogGroup group = logGroups.get(foundEditLog.startTxId);
+    if (group == null) {
+      group = new LogGroup(foundEditLog.startTxId);
+      logGroups.put(foundEditLog.startTxId, group);
+    }
+    group.add(foundEditLog);
+  }
+
+
+  @Override
+  public boolean isUpgradeFinalized() {
+    return isUpgradeFinalized;
+  }
+  
+  /**
+   * @return the image that has the most recent associated transaction ID.
+   * If there are multiple storage directories which contain equal images 
+   * the storage directory that was inspected first will be preferred.
+   * 
+   * Returns null if no images were found.
+   */
+  FoundFSImage getLatestImage() {
+    FoundFSImage ret = null;
+    for (FoundFSImage img : foundImages) {
+      if (ret == null || img.txId > ret.txId) {
+        ret = img;
+      }
+    }
+    return ret;
+  }
+  
+  public List<FoundFSImage> getFoundImages() {
+    return ImmutableList.copyOf(foundImages);
+  }
+  
+  public List<FoundEditLog> getFoundEditLogs() {
+    return ImmutableList.copyOf(foundEditLogs);
+  }
+
+  @Override
+  public LoadPlan createLoadPlan() throws IOException {
+    if (foundImages.isEmpty()) {
+      throw new FileNotFoundException("No valid image files found");
+    }
+
+    FoundFSImage recoveryImage = getLatestImage();
+    LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
+
+    return new TransactionalLoadPlan(recoveryImage,
+        logPlan);
+  }
+  
+  /**
+   * Plan which logs to load in order to bring the namespace up-to-date.
+   * Transactions will be considered in the range (sinceTxId, maxTxId]
+   * 
+   * @param sinceTxId the highest txid that is already loaded 
+   *                  (eg from the image checkpoint)
+   * @param maxStartTxId ignore any log files that start after this txid
+   */
+  LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
+    long expectedTxId = sinceTxId + 1;
+    
+    List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>();
+    
+    SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
+    if (logGroups.size() > tailGroups.size()) {
+      LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + 
+          " groups of logs because they start with a txid less than image " +
+          "txid " + sinceTxId);
+    }
+    
+    SortedMap<Long, LogGroup> usefulGroups;
+    if (maxStartTxId > sinceTxId) {
+      usefulGroups = tailGroups.headMap(maxStartTxId);
+    } else {
+      usefulGroups = new TreeMap<Long, LogGroup>();
+    }
+    
+    if (usefulGroups.size() > tailGroups.size()) {
+      LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + 
+        " groups of logs because they start with a txid higher than max " +
+        "txid " + sinceTxId);
+    }
+
+
+    for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
+      long logStartTxId = entry.getKey();
+      LogGroup logGroup = entry.getValue();
+      
+      logGroup.planRecovery();
+      
+      if (expectedTxId != FSConstants.INVALID_TXID && logStartTxId != expectedTxId) {
+        throw new IOException("Expected next log group would start at txid " +
+            expectedTxId + " but starts at txid " + logStartTxId);
+      }
+      
+      // We can pick any of the non-corrupt logs here
+      recoveryLogs.add(logGroup.getBestNonCorruptLog());
+      
+      // If this log group was finalized, we know to expect the next
+      // log group to start at the following txid (ie no gaps)
+      if (logGroup.hasKnownLastTxId()) {
+        expectedTxId = logGroup.getLastTxId() + 1;
+      } else {
+        // the log group was in-progress so we don't know what ID
+        // the next group should start from.
+        expectedTxId = FSConstants.INVALID_TXID;
+      }
+    }
+    
+    long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
+        0 : usefulGroups.lastKey();
+    if (maxSeenTxId > sinceTxId &&
+        maxSeenTxId > lastLogGroupStartTxId) {
+      String msg = "At least one storage directory indicated it has seen a " +
+        "log segment starting at txid " + maxSeenTxId;
+      if (usefulGroups.isEmpty()) {
+        msg += " but there are no logs to load.";
+      } else {
+        msg += " but the most recent log file found starts with txid " +
+          lastLogGroupStartTxId;
+      }
+      throw new IOException(msg);
+    }
+    
+    return new LogLoadPlan(recoveryLogs,
+        Lists.newArrayList(usefulGroups.values()));
+
+  }
+
+  @Override
+  public boolean needToSave() {
+    return needToSave;
+  }
+  
+  
+  RemoteEditLogManifest getEditLogManifest(long sinceTxId) {
+    List<RemoteEditLog> logs = Lists.newArrayList();
+    for (LogGroup g : logGroups.values()) {
+      if (!g.hasFinalized) continue;
+
+      FoundEditLog fel = g.getBestNonCorruptLog();
+      if (fel.getLastTxId() < sinceTxId) continue;
+      
+      logs.add(new RemoteEditLog(fel.getStartTxId(),
+          fel.getLastTxId()));
+    }
+    
+    return new RemoteEditLogManifest(logs);
+  }
+
+  /**
+   * A group of logs that all start at the same txid.
+   * 
+   * Handles determining which logs are corrupt and which should be considered
+   * candidates for loading.
+   */
+  static class LogGroup {
+    long startTxId;
+    List<FoundEditLog> logs = new ArrayList<FoundEditLog>();;
+    private Set<Long> endTxIds = new TreeSet<Long>();
+    private boolean hasInProgress = false;
+    private boolean hasFinalized = false;
+        
+    LogGroup(long startTxId) {
+      this.startTxId = startTxId;
+    }
+    
+    FoundEditLog getBestNonCorruptLog() {
+      // First look for non-corrupt finalized logs
+      for (FoundEditLog log : logs) {
+        if (!log.isCorrupt() && !log.isInProgress()) {
+          return log;
+        }
+      }
+      // Then look for non-corrupt in-progress logs
+      for (FoundEditLog log : logs) {
+        if (!log.isCorrupt()) {
+          return log;
+        }
+      }
+
+      // We should never get here, because we don't get to the planning stage
+      // without calling planRecovery first, and if we've called planRecovery,
+      // we would have already thrown if there were no non-corrupt logs!
+      throw new IllegalStateException(
+        "No non-corrupt logs for txid " + startTxId);
+    }
+
+    /**
+     * @return true if we can determine the last txid in this log group.
+     */
+    boolean hasKnownLastTxId() {
+      for (FoundEditLog log : logs) {
+        if (!log.isInProgress()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /**
+     * @return the last txid included in the logs in this group
+     * @throws IllegalStateException if it is unknown -
+     *                               {@see #hasKnownLastTxId()}
+     */
+    long getLastTxId() {
+      for (FoundEditLog log : logs) {
+        if (!log.isInProgress()) {
+          return log.lastTxId;
+        }
+      }
+      throw new IllegalStateException("LogGroup only has in-progress logs");
+    }
+
+    
+    void add(FoundEditLog log) {
+      assert log.getStartTxId() == startTxId;
+      logs.add(log);
+      
+      if (log.isInProgress()) {
+        hasInProgress = true;
+      } else {
+        hasFinalized = true;
+        endTxIds.add(log.lastTxId);
+      }
+    }
+    
+    void planRecovery() throws IOException {
+      assert hasInProgress || hasFinalized;
+      
+      checkConsistentEndTxIds();
+        
+      if (hasFinalized && hasInProgress) {
+        planMixedLogRecovery();
+      } else if (!hasFinalized && hasInProgress) {
+        planAllInProgressRecovery();
+      } else if (hasFinalized && !hasInProgress) {
+        LOG.debug("No recovery necessary for logs starting at txid " +
+                  startTxId);
+      }
+    }
+
+    /**
+     * Recovery case for when some logs in the group were in-progress, and
+     * others were finalized. This happens when one of the storage
+     * directories fails.
+     *
+     * The in-progress logs in this case should be considered corrupt.
+     */
+    private void planMixedLogRecovery() throws IOException {
+      for (FoundEditLog log : logs) {
+        if (log.isInProgress()) {
+          LOG.warn("Log at " + log.getFile() + " is in progress, but " +
+                   "other logs starting at the same txid " + startTxId +
+                   " are finalized. Moving aside.");
+          log.markCorrupt();
+        }
+      }
+    }
+    
+    /**
+     * Recovery case for when all of the logs in the group were in progress.
+     * This happens if the NN completely crashes and restarts. In this case
+     * we check the non-zero lengths of each log file, and any logs that are
+     * less than the max of these lengths are considered corrupt.
+     */
+    private void planAllInProgressRecovery() throws IOException {
+      // We only have in-progress logs. We need to figure out which logs have
+      // the latest data to reccover them
+      LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
+               "in-progress (probably truncated due to a previous NameNode " +
+               "crash)");
+      if (logs.size() == 1) {
+        // Only one log, it's our only choice!
+        FoundEditLog log = logs.get(0);
+        if (log.validateLog().numTransactions == 0) {
+          // If it has no transactions, we should consider it corrupt just
+          // to be conservative.
+          // See comment below for similar case
+          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
+              "it has no transactions in it.");
+          log.markCorrupt();          
+        }
+        return;
+      }
+
+      long maxValidTxnCount = Long.MIN_VALUE;
+      for (FoundEditLog log : logs) {
+        long validTxnCount = log.validateLog().numTransactions;
+        LOG.warn("  Log " + log.getFile() +
+            " valid txns=" + validTxnCount +
+            " valid len=" + log.validateLog().validLength);
+        maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
+      }        
+
+      for (FoundEditLog log : logs) {
+        long txns = log.validateLog().numTransactions;
+        if (txns < maxValidTxnCount) {
+          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
+                   "it is has only " + txns + " valid txns whereas another " +
+                   "log has " + maxValidTxnCount);
+          log.markCorrupt();
+        } else if (txns == 0) {
+          // this can happen if the NN crashes right after rolling a log
+          // but before the START_LOG_SEGMENT txn is written. Since the log
+          // is empty, we can just move it aside to its corrupt name.
+          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
+              "it has no transactions in it.");
+          log.markCorrupt();
+        }
+      }
+    }
+
+    /**
+     * Check for the case when we have multiple finalized logs and they have
+     * different ending transaction IDs. This violates an invariant that all
+     * log directories should roll together. We should abort in this case.
+     */
+    private void checkConsistentEndTxIds() throws IOException {
+      if (hasFinalized && endTxIds.size() > 1) {
+        throw new IOException("More than one ending txid was found " +
+            "for logs starting at txid " + startTxId + ". " +
+            "Found: " + StringUtils.join(endTxIds, ','));
+      }
+    }
+
+    void recover() throws IOException {
+      for (FoundEditLog log : logs) {
+        if (log.isCorrupt()) {
+          log.moveAsideCorruptFile();
+        } else if (log.isInProgress()) {
+          log.finalizeLog();
+        }
+      }
+    }    
+  }
+
+  /**
+   * Record of an image that has been located and had its filename parsed.
+   */
+  static class FoundFSImage {
+    final StorageDirectory sd;    
+    final long txId;
+    private final File file;
+    
+    FoundFSImage(StorageDirectory sd, File file, long txId) {
+      assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+      
+      this.sd = sd;
+      this.txId = txId;
+      this.file = file;
+    } 
+    
+    File getFile() {
+      return file;
+    }
+
+    public long getTxId() {
+      return txId;
+    }
+    
+    @Override
+    public String toString() {
+      return file.toString();
+    }
+  }
+  
+  /**
+   * Record of an edit log that has been located and had its filename parsed.
+   */
+  static class FoundEditLog {
+    File file;
+    final long startTxId;
+    long lastTxId;
+    
+    private EditLogValidation cachedValidation = null;
+    private boolean isCorrupt = false;
+    
+    static final long UNKNOWN_END = -1;
+    
+    FoundEditLog(File file,
+        long startTxId, long endTxId) {
+      assert endTxId == UNKNOWN_END || endTxId >= startTxId;
+      assert startTxId > 0;
+      assert file != null;
+      
+      this.startTxId = startTxId;
+      this.lastTxId = endTxId;
+      this.file = file;
+    }
+    
+    public void finalizeLog() throws IOException {
+      long numTransactions = validateLog().numTransactions;
+      long lastTxId = startTxId + numTransactions - 1;
+      File dst = new File(file.getParentFile(),
+          NNStorage.getFinalizedEditsFileName(startTxId, lastTxId));
+      LOG.info("Finalizing edits log " + file + " by renaming to "
+          + dst.getName());
+      if (!file.renameTo(dst)) {
+        throw new IOException("Couldn't finalize log " +
+            file + " to " + dst);
+      }
+      this.lastTxId = lastTxId;
+      file = dst;
+    }
+
+    long getStartTxId() {
+      return startTxId;
+    }
+    
+    long getLastTxId() {
+      return lastTxId;
+    }
+
+    EditLogValidation validateLog() throws IOException {
+      if (cachedValidation == null) {
+        cachedValidation = FSEditLogLoader.validateEditLog(file);
+      }
+      return cachedValidation;
+    }
+
+    boolean isInProgress() {
+      return (lastTxId == UNKNOWN_END);
+    }
+
+    File getFile() {
+      return file;
+    }
+    
+    void markCorrupt() {
+      isCorrupt = true;
+    }
+    
+    boolean isCorrupt() {
+      return isCorrupt;
+    }
+
+    void moveAsideCorruptFile() throws IOException {
+      assert isCorrupt;
+    
+      File src = file;
+      File dst = new File(src.getParent(), src.getName() + ".corrupt");
+      boolean success = src.renameTo(dst);
+      if (!success) {
+        throw new IOException(
+          "Couldn't rename corrupt log " + src + " to " + dst);
+      }
+      file = dst;
+    }
+    
+    @Override
+    public String toString() {
+      return file.toString();
+    }
+  }
+
+  static class TransactionalLoadPlan extends LoadPlan {
+    final FoundFSImage image;
+    final LogLoadPlan logPlan;
+    
+    public TransactionalLoadPlan(FoundFSImage image,
+        LogLoadPlan logPlan) {
+      super();
+      this.image = image;
+      this.logPlan = logPlan;
+    }
+
+    @Override
+    boolean doRecovery() throws IOException {
+      logPlan.doRecovery();
+      return false;
+    }
+
+    @Override
+    File getImageFile() {
+      return image.getFile();
+    }
+
+    @Override
+    List<File> getEditsFiles() {
+      return logPlan.getEditsFiles();
+    }
+
+    @Override
+    StorageDirectory getStorageDirectoryForProperties() {
+      return image.sd;
+    }
+  }
+  
+  static class LogLoadPlan {
+    final List<FoundEditLog> editLogs;
+    final List<LogGroup> logGroupsToRecover;
+    
+    LogLoadPlan(List<FoundEditLog> editLogs,
+        List<LogGroup> logGroupsToRecover) {
+      this.editLogs = editLogs;
+      this.logGroupsToRecover = logGroupsToRecover;
+    }
+
+    public void doRecovery() throws IOException {
+      for (LogGroup g : logGroupsToRecover) {
+        g.recover();
+      }
+    }
+
+    public List<File> getEditsFiles() {
+      List<File> ret = new ArrayList<File>();
+      for (FoundEditLog log : editLogs) {
+        ret.add(log.getFile());
+      }
+      return ret;
+    }
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 29 16:28:45 2011
@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -124,6 +125,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -323,8 +325,7 @@ public class FSNamesystem implements FSC
     if(fsImage == null) {
       this.dir = new FSDirectory(this, conf);
       StartupOption startOpt = NameNode.getStartupOption(conf);
-      this.dir.loadFSImage(getNamespaceDirs(conf),
-                           getNamespaceEditsDirs(conf), startOpt);
+      this.dir.loadFSImage(startOpt);
       long timeTakenToLoadFSImage = now() - systemStart;
       LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
       NameNode.getNameNodeMetrics().setFsImageLoadTime(
@@ -392,8 +393,9 @@ public class FSNamesystem implements FSC
           + propertyName + "\" in hdfs-site.xml;" +
           "\n\t\t- use Backup Node as a persistent and up-to-date storage " +
           "of the file system meta-data.");
-    } else if (dirNames.isEmpty())
-      dirNames.add("file:///tmp/hadoop/dfs/name");
+    } else if (dirNames.isEmpty()) {
+      dirNames = Collections.singletonList("file:///tmp/hadoop/dfs/name");
+    }
     return Util.stringCollectionAsURIs(dirNames);
   }
 
@@ -3258,7 +3260,7 @@ public class FSNamesystem implements FSC
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
       }
-      getFSImage().saveNamespace(true);
+      getFSImage().saveNamespace();
       LOG.info("New namespace image has been created.");
     } finally {
       readUnlock();
@@ -4003,8 +4005,8 @@ public class FSNamesystem implements FSC
     }
   }
 
-  long getEditLogSize() throws IOException {
-    return getEditLog().getEditLogSize();
+  public long getTransactionID() {
+    return getEditLog().getSyncTxId();
   }
 
   CheckpointSignature rollEditLog() throws IOException {
@@ -4019,24 +4021,9 @@ public class FSNamesystem implements FSC
       writeUnlock();
     }
   }
-
-  /**
-   * Moves fsimage.ckpt to fsImage and edits.new to edits
-   * Reopens the new edits file.
-   *
-   * @param sig the signature of this checkpoint (old image)
-   */
-  void rollFSImage(CheckpointSignature sig) throws IOException {
-    writeLock();
-    try {
-      if (isInSafeMode()) {
-        throw new SafeModeException("Image not rolled", safeMode);
-      }
-      LOG.info("Roll FSImage from " + Server.getRemoteAddress());
-      getFSImage().rollFSImage(sig, true);
-    } finally {
-      writeUnlock();
-    }
+  
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
+    return getEditLog().getEditLogManifest(sinceTxId);
   }
 
   NamenodeCommand startCheckpoint(
@@ -4516,31 +4503,29 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * Register a name-node.
-   * <p>
-   * Registration is allowed if there is no ongoing streaming to
-   * another backup node.
-   * We currently allow only one backup node, but multiple chackpointers 
-   * if there are no backups.
+   * Register a Backup name-node, verifying that it belongs
+   * to the correct namespace, and adding it to the set of
+   * active journals if necessary.
    * 
-   * @param registration
-   * @throws IOException
+   * @param bnReg registration of the new BackupNode
+   * @param nnReg registration of this NameNode
+   * @throws IOException if the namespace IDs do not match
    */
-  void registerBackupNode(NamenodeRegistration registration)
-    throws IOException {
+  void registerBackupNode(NamenodeRegistration bnReg,
+      NamenodeRegistration nnReg) throws IOException {
     writeLock();
     try {
       if(getFSImage().getStorage().getNamespaceID() 
-         != registration.getNamespaceID())
+         != bnReg.getNamespaceID())
         throw new IOException("Incompatible namespaceIDs: "
             + " Namenode namespaceID = "
             + getFSImage().getStorage().getNamespaceID() + "; "
-            + registration.getRole() +
-            " node namespaceID = " + registration.getNamespaceID());
-      boolean regAllowed = getEditLog().checkBackupRegistration(registration);
-      if(!regAllowed)
-        throw new IOException("Registration is not allowed. " +
-                              "Another node is registered as a backup.");
+            + bnReg.getRole() +
+            " node namespaceID = " + bnReg.getNamespaceID());
+      if (bnReg.getRole() == NamenodeRole.BACKUP) {
+        getFSImage().getEditLog().registerBackupNode(
+            bnReg, nnReg);
+      }
     } finally {
       writeUnlock();
     }
@@ -5080,4 +5065,6 @@ public class FSNamesystem implements FSC
   void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
     getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
   }
+
+
 }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Journal manager for the common case of edits files being written
+ * to a storage directory.
+ * 
+ * Note: this class is not thread-safe and should be externally
+ * synchronized.
+ */
+class FileJournalManager implements JournalManager {
+  private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
+
+  private final StorageDirectory sd;
+  private int outputBufferCapacity = 512*1024;
+
+  public FileJournalManager(StorageDirectory sd) {
+    this.sd = sd;
+  }
+
+  @Override
+  public EditLogOutputStream startLogSegment(long txid) throws IOException {    
+    File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+    EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
+        outputBufferCapacity);
+    stm.create();
+    return stm;
+  }
+
+  @Override
+  public void finalizeLogSegment(long firstTxId, long lastTxId)
+      throws IOException {
+    File inprogressFile = NNStorage.getInProgressEditsFile(
+        sd, firstTxId);
+    File dstFile = NNStorage.getFinalizedEditsFile(
+        sd, firstTxId, lastTxId);
+    LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
+    
+    Preconditions.checkState(!dstFile.exists(),
+        "Can't finalize edits file " + inprogressFile + " since finalized file " +
+        "already exists");
+    if (!inprogressFile.renameTo(dstFile)) {
+      throw new IOException("Unable to finalize edits file " + inprogressFile);
+    }
+  }
+
+  @VisibleForTesting
+  public StorageDirectory getStorageDirectory() {
+    return sd;
+  }
+
+  @Override
+  public String toString() {
+    return "FileJournalManager for storage directory " + sd;
+  }
+
+  @Override
+  public void setOutputBufferCapacity(int size) {
+    this.outputBufferCapacity = size;
+  }
+
+  @Override
+  public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger)
+      throws IOException {
+    File[] files = FileUtil.listFiles(sd.getCurrentDir());
+    List<FoundEditLog> editLogs = 
+      FSImageTransactionalStorageInspector.matchEditLogs(files);
+    for (FoundEditLog log : editLogs) {
+      if (log.getStartTxId() < minTxIdToKeep &&
+          log.getLastTxId() < minTxIdToKeep) {
+        purger.purgeLog(log);
+      }
+    }
+  }
+
+  @Override
+  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
+      throws IOException {
+    File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
+    return new EditLogFileInputStream(f);
+  }
+
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Fri Jul 29 16:28:45 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.na
 import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.io.*;
+import java.net.InetSocketAddress;
+
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -34,11 +36,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class is used in Namesystem's jetty to retrieve a file.
  * Typically used by the Secondary NameNode to retrieve image and
@@ -50,15 +57,21 @@ public class GetImageServlet extends Htt
 
   private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
 
-  @SuppressWarnings("unchecked")
+  private static final String TXID_PARAM = "txid";
+  private static final String START_TXID_PARAM = "startTxId";
+  private static final String END_TXID_PARAM = "endTxId";
+  private static final String STORAGEINFO_PARAM = "storageInfo";
+  
+  private static Set<Long> currentlyDownloadingCheckpoints =
+    Collections.<Long>synchronizedSet(new HashSet<Long>());
+  
   public void doGet(final HttpServletRequest request,
                     final HttpServletResponse response
                     ) throws ServletException, IOException {
-    Map<String,String[]> pmap = request.getParameterMap();
     try {
       ServletContext context = getServletContext();
       final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
-      final TransferFsImage ff = new TransferFsImage(pmap, request, response);
+      final GetImageParams parsedParams = new GetImageParams(request, response);
       final Configuration conf = 
         (Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);
       
@@ -70,45 +83,77 @@ public class GetImageServlet extends Htt
             + request.getRemoteHost());
         return;
       }
-
+      
+      String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
+      String theirStorageInfoString = parsedParams.getStorageInfoString();
+      if (theirStorageInfoString != null &&
+          !myStorageInfoString.equals(theirStorageInfoString)) {
+        response.sendError(HttpServletResponse.SC_FORBIDDEN,
+            "This namenode has storage info " + myStorageInfoString + 
+            " but the secondary expected " + theirStorageInfoString);
+        LOG.warn("Received an invalid request file transfer request " +
+            "from a secondary with storage info " + theirStorageInfoString);
+        return;
+      }
+      
       UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
-          if (ff.getImage()) {
-            response.setHeader(TransferFsImage.CONTENT_LENGTH,
-                               String.valueOf(nnImage.getStorage()
-                                              .getFsImageName().length()));
+          if (parsedParams.isGetImage()) {
+            long txid = parsedParams.getTxId();
+            File imageFile = nnImage.getStorage().getFsImageName(txid);
+            if (imageFile == null) {
+              throw new IOException("Could not find image with txid " + txid);
+            }
+            setVerificationHeaders(response, imageFile);
             // send fsImage
-            TransferFsImage.getFileServer(response.getOutputStream(),
-                                          nnImage.getStorage().getFsImageName(),
+            TransferFsImage.getFileServer(response.getOutputStream(), imageFile,
                 getThrottler(conf)); 
-          } else if (ff.getEdit()) {
-            response.setHeader(TransferFsImage.CONTENT_LENGTH,
-                               String.valueOf(nnImage.getStorage()
-                                              .getFsEditName().length()));
+          } else if (parsedParams.isGetEdit()) {
+            long startTxId = parsedParams.getStartTxId();
+            long endTxId = parsedParams.getEndTxId();
+            
+            File editFile = nnImage.getStorage()
+                .findFinalizedEditsFile(startTxId, endTxId);
+            setVerificationHeaders(response, editFile);
+            
             // send edits
-            TransferFsImage.getFileServer(response.getOutputStream(),
-                                          nnImage.getStorage().getFsEditName(),
+            TransferFsImage.getFileServer(response.getOutputStream(), editFile,
                 getThrottler(conf));
-          } else if (ff.putImage()) {
-            // issue a HTTP get request to download the new fsimage 
-            nnImage.validateCheckpointUpload(ff.getToken());
-            nnImage.newImageDigest = ff.getNewChecksum();
-            MD5Hash downloadImageDigest = reloginIfNecessary().doAs(
-                new PrivilegedExceptionAction<MD5Hash>() {
-                @Override
-                public MD5Hash run() throws Exception {
-                  return TransferFsImage.getFileClient(
-                      ff.getInfoServer(), "getimage=1", 
-                      nnImage.getStorage().getFsImageNameCheckpoint(), true);
-                }
-            });
-            if (!nnImage.newImageDigest.equals(downloadImageDigest)) {
-              throw new IOException("The downloaded image is corrupt," +
-                  " expecting a checksum " + nnImage.newImageDigest +
-                  " but received a checksum " + downloadImageDigest);
+          } else if (parsedParams.isPutImage()) {
+            final long txid = parsedParams.getTxId();
+
+            if (! currentlyDownloadingCheckpoints.add(txid)) {
+              throw new IOException(
+                  "Another checkpointer is already in the process of uploading a" +
+                  " checkpoint made at transaction ID " + txid);
+            }
+
+            try {
+              if (nnImage.getStorage().findImageFile(txid) != null) {
+                throw new IOException(
+                    "Another checkpointer already uploaded an checkpoint " +
+                    "for txid " + txid);
+              }
+              
+              // issue a HTTP get request to download the new fsimage 
+              MD5Hash downloadImageDigest = reloginIfNecessary().doAs(
+                  new PrivilegedExceptionAction<MD5Hash>() {
+                  @Override
+                  public MD5Hash run() throws Exception {
+                    return TransferFsImage.downloadImageToStorage(
+                        parsedParams.getInfoServer(), txid,
+                        nnImage.getStorage(), true);
+                    }
+              });
+              nnImage.saveDigestAndRenameCheckpointImage(txid, downloadImageDigest);
+              
+              // Now that we have a new checkpoint, we might be able to
+              // remove some old ones.
+              nnImage.purgeOldStorage();
+            } finally {
+              currentlyDownloadingCheckpoints.remove(txid);
             }
-           nnImage.checkpointUploadDone();
           }
           return null;
         }
@@ -182,4 +227,148 @@ public class GetImageServlet extends Htt
     if(LOG.isDebugEnabled()) LOG.debug("isValidRequestor is rejecting: " + remoteUser);
     return false;
   }
+  
+  /**
+   * Set headers for content length, and, if available, md5.
+   * @throws IOException 
+   */
+  private void setVerificationHeaders(HttpServletResponse response, File file)
+  throws IOException {
+    response.setHeader(TransferFsImage.CONTENT_LENGTH,
+        String.valueOf(file.length()));
+    MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
+    if (hash != null) {
+      response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
+    }
+  }
+
+  static String getParamStringForImage(long txid,
+      StorageInfo remoteStorageInfo) {
+    return "getimage=1&" + TXID_PARAM + "=" + txid
+      + "&" + STORAGEINFO_PARAM + "=" +
+      remoteStorageInfo.toColonSeparatedString();
+    
+  }
+
+  static String getParamStringForLog(RemoteEditLog log,
+      StorageInfo remoteStorageInfo) {
+    return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId()
+        + "&" + END_TXID_PARAM + "=" + log.getEndTxId()
+        + "&" + STORAGEINFO_PARAM + "=" +
+          remoteStorageInfo.toColonSeparatedString();
+  }
+  
+  static String getParamStringToPutImage(long txid,
+      InetSocketAddress imageListenAddress, NNStorage storage) {
+    
+    return "putimage=1" +
+      "&" + TXID_PARAM + "=" + txid +
+      "&port=" + imageListenAddress.getPort() +
+      "&machine=" + imageListenAddress.getHostName()
+      + "&" + STORAGEINFO_PARAM + "=" +
+      storage.toColonSeparatedString();
+  }
+
+  
+  static class GetImageParams {
+    private boolean isGetImage;
+    private boolean isGetEdit;
+    private boolean isPutImage;
+    private int remoteport;
+    private String machineName;
+    private long startTxId, endTxId, txId;
+    private String storageInfoString;
+
+    /**
+     * @param request the object from which this servlet reads the url contents
+     * @param response the object into which this servlet writes the url contents
+     * @throws IOException if the request is bad
+     */
+    public GetImageParams(HttpServletRequest request,
+                          HttpServletResponse response
+                           ) throws IOException {
+      @SuppressWarnings("unchecked")
+      Map<String, String[]> pmap = request.getParameterMap();
+      isGetImage = isGetEdit = isPutImage = false;
+      remoteport = 0;
+      machineName = null;
+
+      for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
+        String key = entry.getKey();
+        String[] val = entry.getValue();
+        if (key.equals("getimage")) { 
+          isGetImage = true;
+          txId = parseLongParam(request, TXID_PARAM);
+        } else if (key.equals("getedit")) { 
+          isGetEdit = true;
+          startTxId = parseLongParam(request, START_TXID_PARAM);
+          endTxId = parseLongParam(request, END_TXID_PARAM);
+        } else if (key.equals("putimage")) { 
+          isPutImage = true;
+          txId = parseLongParam(request, TXID_PARAM);
+        } else if (key.equals("port")) { 
+          remoteport = new Integer(val[0]).intValue();
+        } else if (key.equals("machine")) { 
+          machineName = val[0];
+        } else if (key.equals(STORAGEINFO_PARAM)) {
+          storageInfoString = val[0];
+        }
+      }
+
+      int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
+      if ((numGets > 1) || (numGets == 0) && !isPutImage) {
+        throw new IOException("Illegal parameters to TransferFsImage");
+      }
+    }
+
+    public String getStorageInfoString() {
+      return storageInfoString;
+    }
+
+    public long getTxId() {
+      Preconditions.checkState(isGetImage || isPutImage);
+      return txId;
+    }
+    
+    public long getStartTxId() {
+      Preconditions.checkState(isGetEdit);
+      return startTxId;
+    }
+    
+    public long getEndTxId() {
+      Preconditions.checkState(isGetEdit);
+      return endTxId;
+    }
+
+    boolean isGetEdit() {
+      return isGetEdit;
+    }
+
+    boolean isGetImage() {
+      return isGetImage;
+    }
+
+    boolean isPutImage() {
+      return isPutImage;
+    }
+    
+    String getInfoServer() throws IOException{
+      if (machineName == null || remoteport == 0) {
+        throw new IOException ("MachineName and port undefined");
+      }
+      return machineName + ":" + remoteport;
+    }
+    
+    private static long parseLongParam(HttpServletRequest request, String param)
+        throws IOException {
+      // Parse the 'txid' parameter which indicates which image is to be
+      // fetched.
+      String paramStr = request.getParameter(param);
+      if (paramStr == null) {
+        throw new IOException("Invalid request has no " + param + " parameter");
+      }
+      
+      return Long.valueOf(paramStr);
+    }
+  }
 }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
+
+/**
+ * A JournalManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+interface JournalManager {
+  /**
+   * Begin writing to a new segment of the log stream, which starts at
+   * the given transaction ID.
+   */
+  EditLogOutputStream startLogSegment(long txId) throws IOException;
+
+  /**
+   * Mark the log segment that spans from firstTxId to lastTxId
+   * as finalized and complete.
+   */
+  void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
+
+  /**
+   * Set the amount of memory that this stream should use to buffer edits
+   */
+  void setOutputBufferCapacity(int size);
+
+  /**
+   * The JournalManager may archive/purge any logs for transactions less than
+   * or equal to minImageTxId.
+   *
+   * @param minTxIdToKeep the earliest txid that must be retained after purging
+   *                      old logs
+   * @param purger the purging implementation to use
+   * @throws IOException if purging fails
+   */
+  void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger)
+    throws IOException;
+
+  /**
+   * @return an EditLogInputStream that reads from the same log that
+   * the edit log is currently writing. May return null if this journal
+   * manager does not support this operation.
+   */  
+  EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
+    throws IOException;
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Fri Jul 29 16:28:45 2011
@@ -17,22 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
+import java.io.BufferedReader;
 import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.io.OutputStream;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -51,13 +49,17 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
-import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.DNS;
 
+import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * NNStorage is responsible for management of the StorageDirectories used by
  * the NameNode.
@@ -66,17 +68,19 @@ import org.apache.hadoop.net.DNS;
 public class NNStorage extends Storage implements Closeable {
   private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
 
-  static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
-
+  static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
+  
   //
   // The filenames used for storing the images
   //
   enum NameNodeFile {
     IMAGE     ("fsimage"),
-    TIME      ("fstime"),
+    TIME      ("fstime"), // from "old" pre-HDFS-1073 format
+    SEEN_TXID ("seen_txid"),
     EDITS     ("edits"),
     IMAGE_NEW ("fsimage.ckpt"),
-    EDITS_NEW ("edits.new");
+    EDITS_NEW ("edits.new"), // from "old" pre-HDFS-1073 format
+    EDITS_INPROGRESS ("edits_inprogress");
 
     private String fileName = null;
     private NameNodeFile(String name) { this.fileName = name; }
@@ -106,42 +110,9 @@ public class NNStorage extends Storage i
     }
   }
 
-  /**
-   * Interface to be implemented by classes which make use of storage
-   * directories. They are  notified when a StorageDirectory is causing errors,
-   * becoming available or being formatted.
-   *
-   * This allows the implementors of the interface take their own specific
-   * action on the StorageDirectory when this occurs.
-   */
-  interface NNStorageListener {
-    /**
-     * An error has occurred with a StorageDirectory.
-     * @param sd The storage directory causing the error.
-     * @throws IOException
-     */
-    void errorOccurred(StorageDirectory sd) throws IOException;
-
-    /**
-     * A storage directory has been formatted.
-     * @param sd The storage directory being formatted.
-     * @throws IOException
-     */
-    void formatOccurred(StorageDirectory sd) throws IOException;
-
-    /**
-     * A storage directory is now available use.
-     * @param sd The storage directory which has become available.
-     * @throws IOException
-     */
-    void directoryAvailable(StorageDirectory sd) throws IOException;
-  }
-
-  final private List<NNStorageListener> listeners;
   private UpgradeManager upgradeManager = null;
-  protected MD5Hash imageDigest = null;
   protected String blockpoolID = ""; // id of the block pool
-
+  
   /**
    * flag that controls if we try to restore failed storages
    */
@@ -149,7 +120,13 @@ public class NNStorage extends Storage i
   private Object restorationLock = new Object();
   private boolean disablePreUpgradableLayoutCheck = false;
 
-  private long checkpointTime = -1L;  // The age of the image
+
+  /**
+   * TxId of the last transaction that was included in the most
+   * recent fsimage file. This does not include any transactions
+   * that have since been written to the edit log.
+   */
+  protected long mostRecentCheckpointTxId = FSConstants.INVALID_TXID;
 
   /**
    * list of failed (and thus removed) storages
@@ -158,27 +135,26 @@ public class NNStorage extends Storage i
     = new CopyOnWriteArrayList<StorageDirectory>();
 
   /**
-   * Construct the NNStorage.
-   * @param conf Namenode configuration.
+   * Properties from old layout versions that may be needed
+   * during upgrade only.
    */
-  public NNStorage(Configuration conf) {
-    super(NodeType.NAME_NODE);
-
-    storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
-    this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
-  }
+  private HashMap<String, String> deprecatedProperties;
 
   /**
    * Construct the NNStorage.
-   * @param storageInfo storage information
-   * @param bpid block pool Id
+   * @param conf Namenode configuration.
+   * @param imageDirs Directories the image can be stored in.
+   * @param editsDirs Directories the editlog can be stored in.
+   * @throws IOException if any directories are inaccessible.
    */
-  public NNStorage(StorageInfo storageInfo, String bpid) {
-    super(NodeType.NAME_NODE, storageInfo);
+  public NNStorage(Configuration conf, 
+                   Collection<URI> imageDirs, Collection<URI> editsDirs) 
+      throws IOException {
+    super(NodeType.NAME_NODE);
 
     storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
-    this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
-    this.blockpoolID = bpid;
+    
+    setStorageDirectories(imageDirs, editsDirs);
   }
 
   @Override // Storage
@@ -207,7 +183,6 @@ public class NNStorage extends Storage i
 
   @Override // Closeable
   public void close() throws IOException {
-    listeners.clear();
     unlockAll();
     storageDirs.clear();
   }
@@ -232,10 +207,7 @@ public class NNStorage extends Storage i
 
   /**
    * See if any of removed storages is "writable" again, and can be returned
-   * into service. If saveNamespace is set, then this method is being
-   * called from saveNamespace.
-   *
-   * @param saveNamespace Whether method is being called from saveNamespace()
+   * into service.
    */
   void attemptRestoreRemovedStorage() {
     // if directory is "alive" - copy the images there...
@@ -253,23 +225,10 @@ public class NNStorage extends Storage i
         LOG.info("currently disabled dir " + root.getAbsolutePath() +
                  "; type="+sd.getStorageDirType() 
                  + ";canwrite="+root.canWrite());
-        try {
-          
-          if(root.exists() && root.canWrite()) {
-            // when we try to restore we just need to remove all the data
-            // without saving current in-memory state (which could've changed).
-            sd.clearDirectory();
-            
-            LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
-            for (NNStorageListener listener : listeners) {
-              listener.directoryAvailable(sd);
-            }
-            
-            this.addStorageDir(sd); // restore
-            this.removedStorageDirs.remove(sd);
-          }
-        } catch(IOException e) {
-          LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e);
+        if(root.exists() && root.canWrite()) {
+          LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
+          this.addStorageDir(sd); // restore
+          this.removedStorageDirs.remove(sd);
         }
       }
     }
@@ -283,9 +242,11 @@ public class NNStorage extends Storage i
   }
 
   /**
-   * Set the storage directories which will be used. NNStorage.close() should
-   * be called before this to ensure any previous storage directories have been
-   * freed.
+   * Set the storage directories which will be used. This should only ever be
+   * called from inside NNStorage. However, it needs to remain package private
+   * for testing, as StorageDirectories need to be reinitialised after using
+   * Mockito.spy() on this class, as Mockito doesn't work well with inner
+   * classes, such as StorageDirectory in this case.
    *
    * Synchronized due to initialization of storageDirs and removedStorageDirs.
    *
@@ -293,6 +254,7 @@ public class NNStorage extends Storage i
    * @param fsEditsDirs Locations to store edit logs.
    * @throws IOException
    */
+  @VisibleForTesting
   synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
                                           Collection<URI> fsEditsDirs)
       throws IOException {
@@ -411,110 +373,84 @@ public class NNStorage extends Storage i
     }
     return list;
   }
-
+  
   /**
-   * Determine the checkpoint time of the specified StorageDirectory
+   * Determine the last transaction ID noted in this storage directory.
+   * This txid is stored in a special seen_txid file since it might not
+   * correspond to the latest image or edit log. For example, an image-only
+   * directory will have this txid incremented when edits logs roll, even
+   * though the edits logs are in a different directory.
    *
    * @param sd StorageDirectory to check
-   * @return If file exists and can be read, last checkpoint time. If not, 0L.
+   * @return If file exists and can be read, last recorded txid. If not, 0L.
    * @throws IOException On errors processing file pointed to by sd
    */
-  long readCheckpointTime(StorageDirectory sd) throws IOException {
-    File timeFile = getStorageFile(sd, NameNodeFile.TIME);
-    long timeStamp = 0L;
-    if (timeFile.exists() && timeFile.canRead()) {
-      DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+  static long readTransactionIdFile(StorageDirectory sd) throws IOException {
+    File txidFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
+    long txid = 0L;
+    if (txidFile.exists() && txidFile.canRead()) {
+      BufferedReader br = new BufferedReader(new FileReader(txidFile));
       try {
-        timeStamp = in.readLong();
+        txid = Long.valueOf(br.readLine());
       } finally {
-        in.close();
+        IOUtils.cleanup(LOG, br);
       }
     }
-    return timeStamp;
+    return txid;
   }
-
+  
   /**
    * Write last checkpoint time into a separate file.
    *
    * @param sd
    * @throws IOException
    */
-  public void writeCheckpointTime(StorageDirectory sd) throws IOException {
-    if (checkpointTime < 0L)
-      return; // do not write negative time
-    File timeFile = getStorageFile(sd, NameNodeFile.TIME);
-    if (timeFile.exists() && ! timeFile.delete()) {
-        LOG.error("Cannot delete chekpoint time file: "
-                  + timeFile.getCanonicalPath());
-    }
-    FileOutputStream fos = new FileOutputStream(timeFile);
-    DataOutputStream out = new DataOutputStream(fos);
+  void writeTransactionIdFile(StorageDirectory sd, long txid) throws IOException {
+    Preconditions.checkArgument(txid >= 0, "bad txid: " + txid);
+    
+    File txIdFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
+    OutputStream fos = new AtomicFileOutputStream(txIdFile);
     try {
-      out.writeLong(checkpointTime);
-      out.flush();
-      fos.getChannel().force(true);
+      fos.write(String.valueOf(txid).getBytes());
+      fos.write('\n');
     } finally {
-      out.close();
+      IOUtils.cleanup(LOG, fos);
     }
   }
 
   /**
-   * Record new checkpoint time in order to
-   * distinguish healthy directories from the removed ones.
-   * If there is an error writing new checkpoint time, the corresponding
-   * storage directory is removed from the list.
+   * Set the transaction ID of the last checkpoint
    */
-  public void incrementCheckpointTime() {
-    setCheckpointTimeInStorage(checkpointTime + 1);
+  void setMostRecentCheckpointTxId(long txid) {
+    this.mostRecentCheckpointTxId = txid;
   }
 
   /**
-   * The age of the namespace state.<p>
-   * Reflects the latest time the image was saved.
-   * Modified with every save or a checkpoint.
-   * Persisted in VERSION file.
-   *
-   * @return the current checkpoint time.
+   * Return the transaction ID of the last checkpoint.
    */
-  public long getCheckpointTime() {
-    return checkpointTime;
+  long getMostRecentCheckpointTxId() {
+    return mostRecentCheckpointTxId;
   }
 
   /**
-   * Set the checkpoint time.
-   *
-   * This method does not persist the checkpoint time to storage immediately.
+   * Write a small file in all available storage directories that
+   * indicates that the namespace has reached some given transaction ID.
    * 
-   * @see #setCheckpointTimeInStorage
-   * @param newCpT the new checkpoint time.
-   */
-  public void setCheckpointTime(long newCpT) {
-    checkpointTime = newCpT;
-  }
-
-  /**
-   * Set the current checkpoint time. Writes the new checkpoint
-   * time to all available storage directories.
-   * @param newCpT The new checkpoint time.
-   */
-  public void setCheckpointTimeInStorage(long newCpT) {
-    checkpointTime = newCpT;
-    // Write new checkpoint time in all storage directories
-    for(Iterator<StorageDirectory> it =
-                          dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
+   * This is used when the image is loaded to avoid accidental rollbacks
+   * in the case where an edit log is fully deleted but there is no
+   * checkpoint. See TestNameEditsConfigs.testNameEditsConfigsFailure()
+   * @param txid the txid that has been reached
+   */
+  public void writeTransactionIdFileToStorage(long txid) {
+    // Write txid marker in all storage directories
+    for (StorageDirectory sd : storageDirs) {
       try {
-        writeCheckpointTime(sd);
+        writeTransactionIdFile(sd, txid);
       } catch(IOException e) {
         // Close any edits stream associated with this dir and remove directory
-        LOG.warn("incrementCheckpointTime failed on "
-                 + sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
-        try {
-          reportErrorsOnDirectory(sd);
-        } catch (IOException ioe) {
-            LOG.error("Failed to report and remove NN storage directory "
-                      + sd.getRoot().getPath(), ioe);
-        }
+        LOG.warn("writeTransactionIdToStorage failed on " + sd,
+            e);
+        reportErrorsOnDirectory(sd);
       }
     }
   }
@@ -525,11 +461,11 @@ public class NNStorage extends Storage i
    *
    * @return List of filenames to save checkpoints to.
    */
-  public File[] getFsImageNameCheckpoint() {
+  public File[] getFsImageNameCheckpoint(long txid) {
     ArrayList<File> list = new ArrayList<File>();
     for (Iterator<StorageDirectory> it =
                  dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW));
+      list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW, txid));
     }
     return list.toArray(new File[list.size()]);
   }
@@ -538,51 +474,24 @@ public class NNStorage extends Storage i
    * Return the name of the image file.
    * @return The name of the first image file.
    */
-  public File getFsImageName() {
+  public File getFsImageName(long txid) {
     StorageDirectory sd = null;
     for (Iterator<StorageDirectory> it =
       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       sd = it.next();
-      File fsImage = getStorageFile(sd, NameNodeFile.IMAGE);
+      File fsImage = getStorageFile(sd, NameNodeFile.IMAGE, txid);
       if(sd.getRoot().canRead() && fsImage.exists())
         return fsImage;
     }
     return null;
   }
 
-  /**
-   * @return The name of the first editlog file.
-   */
-  public File getFsEditName() throws IOException {
-    for (Iterator<StorageDirectory> it
-           = dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      if(sd.getRoot().canRead())
-        return getEditFile(sd);
-    }
-    return null;
-  }
-
-  /**
-   * @return The name of the first time file.
-   */
-  public File getFsTimeName() {
-    StorageDirectory sd = null;
-    // NameNodeFile.TIME shoul be same on all directories
-    for (Iterator<StorageDirectory> it =
-             dirIterator(); it.hasNext();)
-      sd = it.next();
-    return getStorageFile(sd, NameNodeFile.TIME);
-  }
-
   /** Create new dfs name directory.  Caution: this destroys all files
    * in this filesystem. */
   private void format(StorageDirectory sd) throws IOException {
     sd.clearDirectory(); // create currrent dir
-    for (NNStorageListener listener : listeners) {
-      listener.formatOccurred(sd);
-    }
     writeProperties(sd);
+    writeTransactionIdFile(sd, 0);
 
     LOG.info("Storage directory " + sd.getRoot()
              + " has been successfully formatted.");
@@ -597,7 +506,6 @@ public class NNStorage extends Storage i
     this.clusterID = clusterId;
     this.blockpoolID = newBlockPoolID();
     this.cTime = 0L;
-    this.setCheckpointTime(now());
     for (Iterator<StorageDirectory> it =
                            dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -624,50 +532,6 @@ public class NNStorage extends Storage i
     return newID;
   }
 
-
-  /**
-   * Move {@code current} to {@code lastcheckpoint.tmp} and
-   * recreate empty {@code current}.
-   * {@code current} is moved only if it is well formatted,
-   * that is contains VERSION file.
-   *
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
-   */
-  protected void moveCurrent(StorageDirectory sd)
-    throws IOException {
-    File curDir = sd.getCurrentDir();
-    File tmpCkptDir = sd.getLastCheckpointTmp();
-    // mv current -> lastcheckpoint.tmp
-    // only if current is formatted - has VERSION file
-    if(sd.getVersionFile().exists()) {
-      assert curDir.exists() : curDir + " directory must exist.";
-      assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist.";
-      rename(curDir, tmpCkptDir);
-    }
-    // recreate current
-    if(!curDir.exists() && !curDir.mkdir())
-      throw new IOException("Cannot create directory " + curDir);
-  }
-
-  /**
-   * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint}
-   *
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
-   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
-   */
-  protected void moveLastCheckpoint(StorageDirectory sd)
-    throws IOException {
-    File tmpCkptDir = sd.getLastCheckpointTmp();
-    File prevCkptDir = sd.getPreviousCheckpoint();
-    // remove previous.checkpoint
-    if (prevCkptDir.exists())
-      deleteDir(prevCkptDir);
-    // mv lastcheckpoint.tmp -> previous.checkpoint
-    if(tmpCkptDir.exists())
-      rename(tmpCkptDir, prevCkptDir);
-  }
-
   @Override // Storage
   protected void setFieldsFromProperties(
       Properties props, StorageDirectory sd) throws IOException {
@@ -689,26 +553,35 @@ public class NNStorage extends Storage i
     setDistributedUpgradeState(
         sDUS == null? false : Boolean.parseBoolean(sDUS),
         sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
+    setDeprecatedPropertiesForUpgrade(props);
+  }
 
-    String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
-    if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
-      if (sMd5 == null) {
-        throw new InconsistentFSStateException(sd.getRoot(),
-            "file " + STORAGE_FILE_VERSION
-            + " does not have MD5 image digest.");
-      }
-      this.imageDigest = new MD5Hash(sMd5);
-    } else if (sMd5 != null) {
-      throw new InconsistentFSStateException(sd.getRoot(),
-          "file " + STORAGE_FILE_VERSION +
-          " has image MD5 digest when version is " + layoutVersion);
+  /**
+   * Pull any properties out of the VERSION file that are from older
+   * versions of HDFS and only necessary during upgrade.
+   */
+  private void setDeprecatedPropertiesForUpgrade(Properties props) {
+    deprecatedProperties = new HashMap<String, String>();
+    String md5 = props.getProperty(DEPRECATED_MESSAGE_DIGEST_PROPERTY);
+    if (md5 != null) {
+      deprecatedProperties.put(DEPRECATED_MESSAGE_DIGEST_PROPERTY, md5);
     }
-
-    this.setCheckpointTime(readCheckpointTime(sd));
+  }
+  
+  /**
+   * Return a property that was stored in an earlier version of HDFS.
+   * 
+   * This should only be used during upgrades.
+   */
+  String getDeprecatedProperty(String prop) {
+    assert getLayoutVersion() > FSConstants.LAYOUT_VERSION :
+      "getDeprecatedProperty should only be done when loading " +
+      "storage from past versions during upgrade.";
+    return deprecatedProperties.get(prop);
   }
 
   /**
-   * Write last checkpoint time and version file into the storage directory.
+   * Write version file into the storage directory.
    *
    * The version file should always be written last.
    * Missing or corrupted version file indicates that
@@ -733,50 +606,109 @@ public class NNStorage extends Storage i
       props.setProperty("distributedUpgradeVersion",
                         Integer.toString(uVersion));
     }
-    if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
-      // Though the current NN supports this feature, this function
-      // is called with old layoutVersions from the upgrade tests.
-      if (imageDigest == null) {
-        // May be null on the first save after an upgrade.
-        imageDigest = MD5Hash.digest(
-            new FileInputStream(getStorageFile(sd, NameNodeFile.IMAGE)));
-      }
-      props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
-    }
-
-    writeCheckpointTime(sd);
   }
-
+  
+  static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
+    return new File(sd.getCurrentDir(),
+                    String.format("%s_%019d", type.getName(), imageTxId));
+  }
+  
   /**
-   * @return A File of 'type' in storage directory 'sd'.
+   * Get a storage file for one of the files that doesn't need a txid associated
+   * (e.g version, seen_txid)
    */
   static File getStorageFile(StorageDirectory sd, NameNodeFile type) {
     return new File(sd.getCurrentDir(), type.getName());
   }
 
+  @VisibleForTesting
+  public static String getCheckpointImageFileName(long txid) {
+    return String.format("%s_%019d",
+                         NameNodeFile.IMAGE_NEW.getName(), txid);
+  }
+
+  @VisibleForTesting
+  public static String getImageFileName(long txid) {
+    return String.format("%s_%019d",
+                         NameNodeFile.IMAGE.getName(), txid);
+  }
+  
+  @VisibleForTesting
+  public static String getInProgressEditsFileName(long startTxId) {
+    return String.format("%s_%019d", NameNodeFile.EDITS_INPROGRESS.getName(),
+                         startTxId);
+  }
+  
+  static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {
+    return new File(sd.getCurrentDir(), getInProgressEditsFileName(startTxId));
+  }
+  
+  static File getFinalizedEditsFile(StorageDirectory sd,
+      long startTxId, long endTxId) {
+    return new File(sd.getCurrentDir(),
+        getFinalizedEditsFileName(startTxId, endTxId));
+  }
+  
+  static File getImageFile(StorageDirectory sd, long txid) {
+    return new File(sd.getCurrentDir(),
+        getImageFileName(txid));
+  }
+  
+  @VisibleForTesting
+  public static String getFinalizedEditsFileName(long startTxId, long endTxId) {
+    return String.format("%s_%019d-%019d", NameNodeFile.EDITS.getName(),
+                         startTxId, endTxId);
+  }
+  
   /**
-   * @return A editlog File in storage directory 'sd'.
+   * Return the first readable finalized edits file for the given txid.
    */
-  File getEditFile(StorageDirectory sd) {
-    return getStorageFile(sd, NameNodeFile.EDITS);
+  File findFinalizedEditsFile(long startTxId, long endTxId)
+  throws IOException {
+    File ret = findFile(NameNodeDirType.EDITS,
+        getFinalizedEditsFileName(startTxId, endTxId));
+    if (ret == null) {
+      throw new IOException(
+          "No edits file for txid " + startTxId + "-" + endTxId + " exists!");
+    }
+    return ret;
+  }
+    
+  /**
+   * Return the first readable image file for the given txid, or null
+   * if no such image can be found
+   */
+  File findImageFile(long txid) throws IOException {
+    return findFile(NameNodeDirType.IMAGE,
+        getImageFileName(txid));
   }
 
   /**
-   * @return A temporary editlog File in storage directory 'sd'.
+   * Return the first readable storage file of the given name
+   * across any of the 'current' directories in SDs of the
+   * given type, or null if no such file exists.
    */
-  File getEditNewFile(StorageDirectory sd) {
-    return getStorageFile(sd, NameNodeFile.EDITS_NEW);
+  private File findFile(NameNodeDirType dirType, String name) {
+    for (StorageDirectory sd : dirIterable(dirType)) {
+      File candidate = new File(sd.getCurrentDir(), name);
+      if (sd.getCurrentDir().canRead() &&
+          candidate.exists()) {
+        return candidate;
+      }
+    }
+    return null;
   }
 
   /**
-   * @return A list of all Files of 'type' in available storage directories.
+   * @return A list of the given File in every available storage directory,
+   * regardless of whether it might exist.
    */
-  Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
+  List<File> getFiles(NameNodeDirType dirType, String fileName) {
     ArrayList<File> list = new ArrayList<File>();
     Iterator<StorageDirectory> it =
       (dirType == null) ? dirIterator() : dirIterator(dirType);
     for ( ;it.hasNext(); ) {
-      list.add(getStorageFile(it.next(), type));
+      list.add(new File(it.next().getCurrentDir(), fileName));
     }
     return list;
   }
@@ -809,7 +741,9 @@ public class NNStorage extends Storage i
    * @param uVersion the new version.
    */
   private void setDistributedUpgradeState(boolean uState, int uVersion) {
-    upgradeManager.setUpgradeState(uState, uVersion);
+    if (upgradeManager != null) {
+      upgradeManager.setUpgradeState(uState, uVersion);
+    }
   }
 
   /**
@@ -850,33 +784,6 @@ public class NNStorage extends Storage i
   }
 
   /**
-   * Set the digest for the latest image stored by NNStorage.
-   * @param digest The digest for the image.
-   */
-  void setImageDigest(MD5Hash digest) {
-    this.imageDigest = digest;
-  }
-
-  /**
-   * Get the digest for the latest image storage by NNStorage.
-   * @return The digest for the latest image.
-   */
-  MD5Hash getImageDigest() {
-    return imageDigest;
-  }
-
-  /**
-   * Register a listener. The listener will be notified of changes to the list
-   * of available storage directories.
-   *
-   * @see NNStorageListener
-   * @param sel A storage listener.
-   */
-  void registerListener(NNStorageListener sel) {
-    listeners.add(sel);
-  }
-
-  /**
    * Disable the check for pre-upgradable layouts. Needed for BackupImage.
    * @param val Whether to disable the preupgradeable layout check.
    */
@@ -890,7 +797,7 @@ public class NNStorage extends Storage i
    * @param sds A list of storage directories to mark as errored.
    * @throws IOException
    */
-  void reportErrorsOnDirectories(List<StorageDirectory> sds) throws IOException {
+  void reportErrorsOnDirectories(List<StorageDirectory> sds) {
     for (StorageDirectory sd : sds) {
       reportErrorsOnDirectory(sd);
     }
@@ -904,17 +811,12 @@ public class NNStorage extends Storage i
    * @param sd A storage directory to mark as errored.
    * @throws IOException
    */
-  void reportErrorsOnDirectory(StorageDirectory sd)
-      throws IOException {
+  void reportErrorsOnDirectory(StorageDirectory sd) {
     LOG.error("Error reported on storage directory " + sd);
 
     String lsd = listStorageDirectories();
     LOG.debug("current list of storage dirs:" + lsd);
 
-    for (NNStorageListener listener : listeners) {
-      listener.errorOccurred(sd);
-    }
-
     LOG.warn("About to remove corresponding storage: "
              + sd.getRoot().getAbsolutePath());
     try {
@@ -927,8 +829,7 @@ public class NNStorage extends Storage i
     if (this.storageDirs.remove(sd)) {
       this.removedStorageDirs.add(sd);
     }
-    incrementCheckpointTime();
-
+    
     lsd = listStorageDirectories();
     LOG.debug("at the end current list of storage dirs:" + lsd);
   }
@@ -968,6 +869,29 @@ public class NNStorage extends Storage i
   }
   
   /**
+   * Report that an IOE has occurred on some file which may
+   * or may not be within one of the NN image storage directories.
+   */
+  void reportErrorOnFile(File f) {
+    // We use getAbsolutePath here instead of getCanonicalPath since we know
+    // that there is some IO problem on that drive.
+    // getCanonicalPath may need to call stat() or readlink() and it's likely
+    // those calls would fail due to the same underlying IO problem.
+    String absPath = f.getAbsolutePath();
+    for (StorageDirectory sd : storageDirs) {
+      String dirPath = sd.getRoot().getAbsolutePath();
+      if (!dirPath.endsWith("/")) {
+        dirPath += "/";
+      }
+      if (absPath.startsWith(dirPath)) {
+        reportErrorsOnDirectory(sd);
+        return;
+      }
+    }
+    
+  }
+  
+  /**
    * Generate new clusterID.
    * 
    * clusterID is a persistent attribute of the cluster.
@@ -1065,4 +989,67 @@ public class NNStorage extends Storage i
   public String getBlockPoolID() {
     return blockpoolID;
   }
+
+  /**
+   * Iterate over all current storage directories, inspecting them
+   * with the given inspector.
+   */
+  void inspectStorageDirs(FSImageStorageInspector inspector)
+      throws IOException {
+
+    // Process each of the storage directories to find the pair of
+    // newest image file and edit file
+    for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      inspector.inspectDirectory(sd);
+    }
+  }
+
+  /**
+   * Iterate over all of the storage dirs, reading their contents to determine
+   * their layout versions. Returns an FSImageStorageInspector which has
+   * inspected each directory.
+   * 
+   * <b>Note:</b> this can mutate the storage info fields (ctime, version, etc).
+   * @throws IOException if no valid storage dirs are found
+   */
+  FSImageStorageInspector readAndInspectDirs()
+      throws IOException {
+    int minLayoutVersion = Integer.MAX_VALUE; // the newest
+    int maxLayoutVersion = Integer.MIN_VALUE; // the oldest
+    
+    // First determine what range of layout versions we're going to inspect
+    for (Iterator<StorageDirectory> it = dirIterator();
+         it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if (!sd.getVersionFile().exists()) {
+        FSImage.LOG.warn("Storage directory " + sd + " contains no VERSION file. Skipping...");
+        continue;
+      }
+      readProperties(sd); // sets layoutVersion
+      minLayoutVersion = Math.min(minLayoutVersion, getLayoutVersion());
+      maxLayoutVersion = Math.max(maxLayoutVersion, getLayoutVersion());
+    }
+    
+    if (minLayoutVersion > maxLayoutVersion) {
+      throw new IOException("No storage directories contained VERSION information");
+    }
+    assert minLayoutVersion <= maxLayoutVersion;
+    
+    // If we have any storage directories with the new layout version
+    // (ie edits_<txnid>) then use the new inspector, which will ignore
+    // the old format dirs.
+    FSImageStorageInspector inspector;
+    if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, minLayoutVersion)) {
+      inspector = new FSImageTransactionalStorageInspector();
+      if (!LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, maxLayoutVersion)) {
+        FSImage.LOG.warn("Ignoring one or more storage directories with old layouts");
+      }
+    } else {
+      inspector = new FSImagePreTransactionalStorageInspector();
+    }
+    
+    inspectStorageDirs(inspector);
+    return inspector;
+  }
 }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * The NNStorageRetentionManager is responsible for inspecting the storage
+ * directories of the NN and enforcing a retention policy on checkpoints
+ * and edit logs.
+ * 
+ * It delegates the actual removal of files to a StoragePurger
+ * implementation, which might delete the files or instead copy them to
+ * a filer or HDFS for later analysis.
+ */
+public class NNStorageRetentionManager {
+  
+  private final int numCheckpointsToRetain;
+  private static final Log LOG = LogFactory.getLog(
+      NNStorageRetentionManager.class);
+  private final NNStorage storage;
+  private final StoragePurger purger;
+  private final FSEditLog editLog;
+  
+  public NNStorageRetentionManager(
+      Configuration conf,
+      NNStorage storage,
+      FSEditLog editLog,
+      StoragePurger purger) {
+    this.numCheckpointsToRetain = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT);
+    this.storage = storage;
+    this.editLog = editLog;
+    this.purger = purger;
+  }
+  
+  public NNStorageRetentionManager(Configuration conf, NNStorage storage,
+      FSEditLog editLog) {
+    this(conf, storage, editLog, new DeletionStoragePurger());
+  }
+
+  public void purgeOldStorage() throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+      new FSImageTransactionalStorageInspector();
+    storage.inspectStorageDirs(inspector);
+
+    long minImageTxId = getImageTxIdToRetain(inspector);
+    purgeCheckpointsOlderThan(inspector, minImageTxId);
+    // If fsimage_N is the image we want to keep, then we need to keep
+    // all txns > N. We can remove anything < N+1, since fsimage_N
+    // reflects the state up to and including N.
+    editLog.purgeLogsOlderThan(minImageTxId + 1, purger);
+  }
+  
+  private void purgeCheckpointsOlderThan(
+      FSImageTransactionalStorageInspector inspector,
+      long minTxId) {
+    for (FoundFSImage image : inspector.getFoundImages()) {
+      if (image.getTxId() < minTxId) {
+        LOG.info("Purging old image " + image);
+        purger.purgeImage(image);
+      }
+    }
+  }
+
+  /**
+   * @param inspector inspector that has already inspected all storage dirs
+   * @return the transaction ID corresponding to the oldest checkpoint
+   * that should be retained. 
+   */
+  private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) {
+      
+    List<FoundFSImage> images = inspector.getFoundImages();
+    TreeSet<Long> imageTxIds = Sets.newTreeSet();
+    for (FoundFSImage image : images) {
+      imageTxIds.add(image.getTxId());
+    }
+    
+    List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds);
+    if (imageTxIdsList.isEmpty()) {
+      return 0;
+    }
+    
+    Collections.reverse(imageTxIdsList);
+    int toRetain = Math.min(numCheckpointsToRetain, imageTxIdsList.size());    
+    long minTxId = imageTxIdsList.get(toRetain - 1);
+    LOG.info("Going to retain " + toRetain + " images with txid >= " +
+        minTxId);
+    return minTxId;
+  }
+  
+  /**
+   * Interface responsible for disposing of old checkpoints and edit logs.
+   */
+  static interface StoragePurger {
+    void purgeLog(FoundEditLog log);
+    void purgeImage(FoundFSImage image);
+  }
+  
+  static class DeletionStoragePurger implements StoragePurger {
+    @Override
+    public void purgeLog(FoundEditLog log) {
+      deleteOrWarn(log.getFile());
+    }
+
+    @Override
+    public void purgeImage(FoundFSImage image) {
+      deleteOrWarn(image.getFile());
+      deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
+    }
+
+    private static void deleteOrWarn(File file) {
+      if (!file.delete()) {
+        // It's OK if we fail to delete something -- we'll catch it
+        // next time we swing through this directory.
+        LOG.warn("Could not delete " + file);
+      }      
+    }
+  }
+}