You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 18:28:51 UTC
svn commit: r1152295 [4/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/ src/j...
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,688 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
+ public static final Log LOG = LogFactory.getLog(
+ FSImageTransactionalStorageInspector.class);
+
+ private boolean needToSave = false;
+ private boolean isUpgradeFinalized = true;
+
+ List<FoundFSImage> foundImages = new ArrayList<FoundFSImage>();
+ List<FoundEditLog> foundEditLogs = new ArrayList<FoundEditLog>();
+ SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
+ long maxSeenTxId = 0;
+
+ private static final Pattern IMAGE_REGEX = Pattern.compile(
+ NameNodeFile.IMAGE.getName() + "_(\\d+)");
+ private static final Pattern EDITS_REGEX = Pattern.compile(
+ NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
+ private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
+ NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+
+ @Override
+ public void inspectDirectory(StorageDirectory sd) throws IOException {
+ // Was the directory just formatted?
+ if (!sd.getVersionFile().exists()) {
+ LOG.info("No version file in " + sd.getRoot());
+ needToSave |= true;
+ return;
+ }
+
+ File currentDir = sd.getCurrentDir();
+ File filesInStorage[];
+ try {
+ filesInStorage = FileUtil.listFiles(currentDir);
+ } catch (IOException ioe) {
+ LOG.warn("Unable to inspect storage directory " + currentDir,
+ ioe);
+ return;
+ }
+
+ for (File f : filesInStorage) {
+ LOG.debug("Checking file " + f);
+ String name = f.getName();
+
+ // Check for fsimage_*
+ Matcher imageMatch = IMAGE_REGEX.matcher(name);
+ if (imageMatch.matches()) {
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
+ try {
+ long txid = Long.valueOf(imageMatch.group(1));
+ foundImages.add(new FoundFSImage(sd, f, txid));
+ } catch (NumberFormatException nfe) {
+ LOG.error("Image file " + f + " has improperly formatted " +
+ "transaction ID");
+ // skip
+ }
+ } else {
+ LOG.warn("Found image file at " + f + " but storage directory is " +
+ "not configured to contain images.");
+ }
+ }
+ }
+
+
+ // Check for a seen_txid file, which marks a minimum transaction ID that
+ // must be included in our load plan.
+ try {
+ maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+ } catch (IOException ioe) {
+ LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
+ }
+
+ List<FoundEditLog> editLogs = matchEditLogs(filesInStorage);
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+ for (FoundEditLog log : editLogs) {
+ addEditLog(log);
+ }
+ } else if (!editLogs.isEmpty()){
+ LOG.warn("Found the following edit log file(s) in " + sd +
+ " even though it was not configured to store edits:\n" +
+ " " + Joiner.on("\n ").join(editLogs));
+
+ }
+
+ // set finalized flag
+ isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
+ }
+
+ static List<FoundEditLog> matchEditLogs(File[] filesInStorage) {
+ List<FoundEditLog> ret = Lists.newArrayList();
+ for (File f : filesInStorage) {
+ String name = f.getName();
+ // Check for edits
+ Matcher editsMatch = EDITS_REGEX.matcher(name);
+ if (editsMatch.matches()) {
+ try {
+ long startTxId = Long.valueOf(editsMatch.group(1));
+ long endTxId = Long.valueOf(editsMatch.group(2));
+ ret.add(new FoundEditLog(f, startTxId, endTxId));
+ } catch (NumberFormatException nfe) {
+ LOG.error("Edits file " + f + " has improperly formatted " +
+ "transaction ID");
+ // skip
+ }
+ }
+
+ // Check for in-progress edits
+ Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);
+ if (inProgressEditsMatch.matches()) {
+ try {
+ long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
+ ret.add(
+ new FoundEditLog(f, startTxId, FoundEditLog.UNKNOWN_END));
+ } catch (NumberFormatException nfe) {
+ LOG.error("In-progress edits file " + f + " has improperly " +
+ "formatted transaction ID");
+ // skip
+ }
+ }
+ }
+ return ret;
+ }
+
+ private void addEditLog(FoundEditLog foundEditLog) {
+ foundEditLogs.add(foundEditLog);
+ LogGroup group = logGroups.get(foundEditLog.startTxId);
+ if (group == null) {
+ group = new LogGroup(foundEditLog.startTxId);
+ logGroups.put(foundEditLog.startTxId, group);
+ }
+ group.add(foundEditLog);
+ }
+
+
+ @Override
+ public boolean isUpgradeFinalized() {
+ return isUpgradeFinalized;
+ }
+
+ /**
+ * @return the image that has the most recent associated transaction ID.
+ * If there are multiple storage directories which contain equal images
+ * the storage directory that was inspected first will be preferred.
+ *
+ * Returns null if no images were found.
+ */
+ FoundFSImage getLatestImage() {
+ FoundFSImage ret = null;
+ for (FoundFSImage img : foundImages) {
+ if (ret == null || img.txId > ret.txId) {
+ ret = img;
+ }
+ }
+ return ret;
+ }
+
+ public List<FoundFSImage> getFoundImages() {
+ return ImmutableList.copyOf(foundImages);
+ }
+
+ public List<FoundEditLog> getFoundEditLogs() {
+ return ImmutableList.copyOf(foundEditLogs);
+ }
+
+ @Override
+ public LoadPlan createLoadPlan() throws IOException {
+ if (foundImages.isEmpty()) {
+ throw new FileNotFoundException("No valid image files found");
+ }
+
+ FoundFSImage recoveryImage = getLatestImage();
+ LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
+
+ return new TransactionalLoadPlan(recoveryImage,
+ logPlan);
+ }
+
+ /**
+ * Plan which logs to load in order to bring the namespace up-to-date.
+ * Transactions will be considered in the range (sinceTxId, maxTxId]
+ *
+ * @param sinceTxId the highest txid that is already loaded
+ * (eg from the image checkpoint)
+ * @param maxStartTxId ignore any log files that start after this txid
+ */
+ LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
+ long expectedTxId = sinceTxId + 1;
+
+ List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>();
+
+ SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
+ if (logGroups.size() > tailGroups.size()) {
+ LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) +
+ " groups of logs because they start with a txid less than image " +
+ "txid " + sinceTxId);
+ }
+
+ SortedMap<Long, LogGroup> usefulGroups;
+ if (maxStartTxId > sinceTxId) {
+ usefulGroups = tailGroups.headMap(maxStartTxId);
+ } else {
+ usefulGroups = new TreeMap<Long, LogGroup>();
+ }
+
+ if (usefulGroups.size() > tailGroups.size()) {
+ LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) +
+ " groups of logs because they start with a txid higher than max " +
+ "txid " + sinceTxId);
+ }
+
+
+ for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
+ long logStartTxId = entry.getKey();
+ LogGroup logGroup = entry.getValue();
+
+ logGroup.planRecovery();
+
+ if (expectedTxId != FSConstants.INVALID_TXID && logStartTxId != expectedTxId) {
+ throw new IOException("Expected next log group would start at txid " +
+ expectedTxId + " but starts at txid " + logStartTxId);
+ }
+
+ // We can pick any of the non-corrupt logs here
+ recoveryLogs.add(logGroup.getBestNonCorruptLog());
+
+ // If this log group was finalized, we know to expect the next
+ // log group to start at the following txid (ie no gaps)
+ if (logGroup.hasKnownLastTxId()) {
+ expectedTxId = logGroup.getLastTxId() + 1;
+ } else {
+ // the log group was in-progress so we don't know what ID
+ // the next group should start from.
+ expectedTxId = FSConstants.INVALID_TXID;
+ }
+ }
+
+ long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
+ 0 : usefulGroups.lastKey();
+ if (maxSeenTxId > sinceTxId &&
+ maxSeenTxId > lastLogGroupStartTxId) {
+ String msg = "At least one storage directory indicated it has seen a " +
+ "log segment starting at txid " + maxSeenTxId;
+ if (usefulGroups.isEmpty()) {
+ msg += " but there are no logs to load.";
+ } else {
+ msg += " but the most recent log file found starts with txid " +
+ lastLogGroupStartTxId;
+ }
+ throw new IOException(msg);
+ }
+
+ return new LogLoadPlan(recoveryLogs,
+ Lists.newArrayList(usefulGroups.values()));
+
+ }
+
+ @Override
+ public boolean needToSave() {
+ return needToSave;
+ }
+
+
+ RemoteEditLogManifest getEditLogManifest(long sinceTxId) {
+ List<RemoteEditLog> logs = Lists.newArrayList();
+ for (LogGroup g : logGroups.values()) {
+ if (!g.hasFinalized) continue;
+
+ FoundEditLog fel = g.getBestNonCorruptLog();
+ if (fel.getLastTxId() < sinceTxId) continue;
+
+ logs.add(new RemoteEditLog(fel.getStartTxId(),
+ fel.getLastTxId()));
+ }
+
+ return new RemoteEditLogManifest(logs);
+ }
+
+ /**
+ * A group of logs that all start at the same txid.
+ *
+ * Handles determining which logs are corrupt and which should be considered
+ * candidates for loading.
+ */
+ static class LogGroup {
+ long startTxId;
+ List<FoundEditLog> logs = new ArrayList<FoundEditLog>();;
+ private Set<Long> endTxIds = new TreeSet<Long>();
+ private boolean hasInProgress = false;
+ private boolean hasFinalized = false;
+
+ LogGroup(long startTxId) {
+ this.startTxId = startTxId;
+ }
+
+ FoundEditLog getBestNonCorruptLog() {
+ // First look for non-corrupt finalized logs
+ for (FoundEditLog log : logs) {
+ if (!log.isCorrupt() && !log.isInProgress()) {
+ return log;
+ }
+ }
+ // Then look for non-corrupt in-progress logs
+ for (FoundEditLog log : logs) {
+ if (!log.isCorrupt()) {
+ return log;
+ }
+ }
+
+ // We should never get here, because we don't get to the planning stage
+ // without calling planRecovery first, and if we've called planRecovery,
+ // we would have already thrown if there were no non-corrupt logs!
+ throw new IllegalStateException(
+ "No non-corrupt logs for txid " + startTxId);
+ }
+
+ /**
+ * @return true if we can determine the last txid in this log group.
+ */
+ boolean hasKnownLastTxId() {
+ for (FoundEditLog log : logs) {
+ if (!log.isInProgress()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return the last txid included in the logs in this group
+ * @throws IllegalStateException if it is unknown -
+ * {@see #hasKnownLastTxId()}
+ */
+ long getLastTxId() {
+ for (FoundEditLog log : logs) {
+ if (!log.isInProgress()) {
+ return log.lastTxId;
+ }
+ }
+ throw new IllegalStateException("LogGroup only has in-progress logs");
+ }
+
+
+ void add(FoundEditLog log) {
+ assert log.getStartTxId() == startTxId;
+ logs.add(log);
+
+ if (log.isInProgress()) {
+ hasInProgress = true;
+ } else {
+ hasFinalized = true;
+ endTxIds.add(log.lastTxId);
+ }
+ }
+
+ void planRecovery() throws IOException {
+ assert hasInProgress || hasFinalized;
+
+ checkConsistentEndTxIds();
+
+ if (hasFinalized && hasInProgress) {
+ planMixedLogRecovery();
+ } else if (!hasFinalized && hasInProgress) {
+ planAllInProgressRecovery();
+ } else if (hasFinalized && !hasInProgress) {
+ LOG.debug("No recovery necessary for logs starting at txid " +
+ startTxId);
+ }
+ }
+
+ /**
+ * Recovery case for when some logs in the group were in-progress, and
+ * others were finalized. This happens when one of the storage
+ * directories fails.
+ *
+ * The in-progress logs in this case should be considered corrupt.
+ */
+ private void planMixedLogRecovery() throws IOException {
+ for (FoundEditLog log : logs) {
+ if (log.isInProgress()) {
+ LOG.warn("Log at " + log.getFile() + " is in progress, but " +
+ "other logs starting at the same txid " + startTxId +
+ " are finalized. Moving aside.");
+ log.markCorrupt();
+ }
+ }
+ }
+
+ /**
+ * Recovery case for when all of the logs in the group were in progress.
+ * This happens if the NN completely crashes and restarts. In this case
+ * we check the non-zero lengths of each log file, and any logs that are
+ * less than the max of these lengths are considered corrupt.
+ */
+ private void planAllInProgressRecovery() throws IOException {
+ // We only have in-progress logs. We need to figure out which logs have
+ // the latest data to reccover them
+ LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
+ "in-progress (probably truncated due to a previous NameNode " +
+ "crash)");
+ if (logs.size() == 1) {
+ // Only one log, it's our only choice!
+ FoundEditLog log = logs.get(0);
+ if (log.validateLog().numTransactions == 0) {
+ // If it has no transactions, we should consider it corrupt just
+ // to be conservative.
+ // See comment below for similar case
+ LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
+ "it has no transactions in it.");
+ log.markCorrupt();
+ }
+ return;
+ }
+
+ long maxValidTxnCount = Long.MIN_VALUE;
+ for (FoundEditLog log : logs) {
+ long validTxnCount = log.validateLog().numTransactions;
+ LOG.warn(" Log " + log.getFile() +
+ " valid txns=" + validTxnCount +
+ " valid len=" + log.validateLog().validLength);
+ maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
+ }
+
+ for (FoundEditLog log : logs) {
+ long txns = log.validateLog().numTransactions;
+ if (txns < maxValidTxnCount) {
+ LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
+ "it is has only " + txns + " valid txns whereas another " +
+ "log has " + maxValidTxnCount);
+ log.markCorrupt();
+ } else if (txns == 0) {
+ // this can happen if the NN crashes right after rolling a log
+ // but before the START_LOG_SEGMENT txn is written. Since the log
+ // is empty, we can just move it aside to its corrupt name.
+ LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
+ "it has no transactions in it.");
+ log.markCorrupt();
+ }
+ }
+ }
+
+ /**
+ * Check for the case when we have multiple finalized logs and they have
+ * different ending transaction IDs. This violates an invariant that all
+ * log directories should roll together. We should abort in this case.
+ */
+ private void checkConsistentEndTxIds() throws IOException {
+ if (hasFinalized && endTxIds.size() > 1) {
+ throw new IOException("More than one ending txid was found " +
+ "for logs starting at txid " + startTxId + ". " +
+ "Found: " + StringUtils.join(endTxIds, ','));
+ }
+ }
+
+ void recover() throws IOException {
+ for (FoundEditLog log : logs) {
+ if (log.isCorrupt()) {
+ log.moveAsideCorruptFile();
+ } else if (log.isInProgress()) {
+ log.finalizeLog();
+ }
+ }
+ }
+ }
+
+ /**
+ * Record of an image that has been located and had its filename parsed.
+ */
+ static class FoundFSImage {
+ final StorageDirectory sd;
+ final long txId;
+ private final File file;
+
+ FoundFSImage(StorageDirectory sd, File file, long txId) {
+ assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+
+ this.sd = sd;
+ this.txId = txId;
+ this.file = file;
+ }
+
+ File getFile() {
+ return file;
+ }
+
+ public long getTxId() {
+ return txId;
+ }
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
+ }
+
+ /**
+ * Record of an edit log that has been located and had its filename parsed.
+ */
+ static class FoundEditLog {
+ File file;
+ final long startTxId;
+ long lastTxId;
+
+ private EditLogValidation cachedValidation = null;
+ private boolean isCorrupt = false;
+
+ static final long UNKNOWN_END = -1;
+
+ FoundEditLog(File file,
+ long startTxId, long endTxId) {
+ assert endTxId == UNKNOWN_END || endTxId >= startTxId;
+ assert startTxId > 0;
+ assert file != null;
+
+ this.startTxId = startTxId;
+ this.lastTxId = endTxId;
+ this.file = file;
+ }
+
+ public void finalizeLog() throws IOException {
+ long numTransactions = validateLog().numTransactions;
+ long lastTxId = startTxId + numTransactions - 1;
+ File dst = new File(file.getParentFile(),
+ NNStorage.getFinalizedEditsFileName(startTxId, lastTxId));
+ LOG.info("Finalizing edits log " + file + " by renaming to "
+ + dst.getName());
+ if (!file.renameTo(dst)) {
+ throw new IOException("Couldn't finalize log " +
+ file + " to " + dst);
+ }
+ this.lastTxId = lastTxId;
+ file = dst;
+ }
+
+ long getStartTxId() {
+ return startTxId;
+ }
+
+ long getLastTxId() {
+ return lastTxId;
+ }
+
+ EditLogValidation validateLog() throws IOException {
+ if (cachedValidation == null) {
+ cachedValidation = FSEditLogLoader.validateEditLog(file);
+ }
+ return cachedValidation;
+ }
+
+ boolean isInProgress() {
+ return (lastTxId == UNKNOWN_END);
+ }
+
+ File getFile() {
+ return file;
+ }
+
+ void markCorrupt() {
+ isCorrupt = true;
+ }
+
+ boolean isCorrupt() {
+ return isCorrupt;
+ }
+
+ void moveAsideCorruptFile() throws IOException {
+ assert isCorrupt;
+
+ File src = file;
+ File dst = new File(src.getParent(), src.getName() + ".corrupt");
+ boolean success = src.renameTo(dst);
+ if (!success) {
+ throw new IOException(
+ "Couldn't rename corrupt log " + src + " to " + dst);
+ }
+ file = dst;
+ }
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
+ }
+
+ static class TransactionalLoadPlan extends LoadPlan {
+ final FoundFSImage image;
+ final LogLoadPlan logPlan;
+
+ public TransactionalLoadPlan(FoundFSImage image,
+ LogLoadPlan logPlan) {
+ super();
+ this.image = image;
+ this.logPlan = logPlan;
+ }
+
+ @Override
+ boolean doRecovery() throws IOException {
+ logPlan.doRecovery();
+ return false;
+ }
+
+ @Override
+ File getImageFile() {
+ return image.getFile();
+ }
+
+ @Override
+ List<File> getEditsFiles() {
+ return logPlan.getEditsFiles();
+ }
+
+ @Override
+ StorageDirectory getStorageDirectoryForProperties() {
+ return image.sd;
+ }
+ }
+
+ static class LogLoadPlan {
+ final List<FoundEditLog> editLogs;
+ final List<LogGroup> logGroupsToRecover;
+
+ LogLoadPlan(List<FoundEditLog> editLogs,
+ List<LogGroup> logGroupsToRecover) {
+ this.editLogs = editLogs;
+ this.logGroupsToRecover = logGroupsToRecover;
+ }
+
+ public void doRecovery() throws IOException {
+ for (LogGroup g : logGroupsToRecover) {
+ g.recover();
+ }
+ }
+
+ public List<File> getEditsFiles() {
+ List<File> ret = new ArrayList<File>();
+ for (FoundEditLog log : editLogs) {
+ ret.add(log.getFile());
+ }
+ return ret;
+ }
+ }
+}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 29 16:28:45 2011
@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -124,6 +125,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
@@ -323,8 +325,7 @@ public class FSNamesystem implements FSC
if(fsImage == null) {
this.dir = new FSDirectory(this, conf);
StartupOption startOpt = NameNode.getStartupOption(conf);
- this.dir.loadFSImage(getNamespaceDirs(conf),
- getNamespaceEditsDirs(conf), startOpt);
+ this.dir.loadFSImage(startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNode.getNameNodeMetrics().setFsImageLoadTime(
@@ -392,8 +393,9 @@ public class FSNamesystem implements FSC
+ propertyName + "\" in hdfs-site.xml;" +
"\n\t\t- use Backup Node as a persistent and up-to-date storage " +
"of the file system meta-data.");
- } else if (dirNames.isEmpty())
- dirNames.add("file:///tmp/hadoop/dfs/name");
+ } else if (dirNames.isEmpty()) {
+ dirNames = Collections.singletonList("file:///tmp/hadoop/dfs/name");
+ }
return Util.stringCollectionAsURIs(dirNames);
}
@@ -3258,7 +3260,7 @@ public class FSNamesystem implements FSC
throw new IOException("Safe mode should be turned ON " +
"in order to create namespace image.");
}
- getFSImage().saveNamespace(true);
+ getFSImage().saveNamespace();
LOG.info("New namespace image has been created.");
} finally {
readUnlock();
@@ -4003,8 +4005,8 @@ public class FSNamesystem implements FSC
}
}
- long getEditLogSize() throws IOException {
- return getEditLog().getEditLogSize();
+ public long getTransactionID() {
+ return getEditLog().getSyncTxId();
}
CheckpointSignature rollEditLog() throws IOException {
@@ -4019,24 +4021,9 @@ public class FSNamesystem implements FSC
writeUnlock();
}
}
-
- /**
- * Moves fsimage.ckpt to fsImage and edits.new to edits
- * Reopens the new edits file.
- *
- * @param sig the signature of this checkpoint (old image)
- */
- void rollFSImage(CheckpointSignature sig) throws IOException {
- writeLock();
- try {
- if (isInSafeMode()) {
- throw new SafeModeException("Image not rolled", safeMode);
- }
- LOG.info("Roll FSImage from " + Server.getRemoteAddress());
- getFSImage().rollFSImage(sig, true);
- } finally {
- writeUnlock();
- }
+
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
+ return getEditLog().getEditLogManifest(sinceTxId);
}
NamenodeCommand startCheckpoint(
@@ -4516,31 +4503,29 @@ public class FSNamesystem implements FSC
}
/**
- * Register a name-node.
- * <p>
- * Registration is allowed if there is no ongoing streaming to
- * another backup node.
- * We currently allow only one backup node, but multiple chackpointers
- * if there are no backups.
+ * Register a Backup name-node, verifying that it belongs
+ * to the correct namespace, and adding it to the set of
+ * active journals if necessary.
*
- * @param registration
- * @throws IOException
+ * @param bnReg registration of the new BackupNode
+ * @param nnReg registration of this NameNode
+ * @throws IOException if the namespace IDs do not match
*/
- void registerBackupNode(NamenodeRegistration registration)
- throws IOException {
+ void registerBackupNode(NamenodeRegistration bnReg,
+ NamenodeRegistration nnReg) throws IOException {
writeLock();
try {
if(getFSImage().getStorage().getNamespaceID()
- != registration.getNamespaceID())
+ != bnReg.getNamespaceID())
throw new IOException("Incompatible namespaceIDs: "
+ " Namenode namespaceID = "
+ getFSImage().getStorage().getNamespaceID() + "; "
- + registration.getRole() +
- " node namespaceID = " + registration.getNamespaceID());
- boolean regAllowed = getEditLog().checkBackupRegistration(registration);
- if(!regAllowed)
- throw new IOException("Registration is not allowed. " +
- "Another node is registered as a backup.");
+ + bnReg.getRole() +
+ " node namespaceID = " + bnReg.getNamespaceID());
+ if (bnReg.getRole() == NamenodeRole.BACKUP) {
+ getFSImage().getEditLog().registerBackupNode(
+ bnReg, nnReg);
+ }
} finally {
writeUnlock();
}
@@ -5080,4 +5065,6 @@ public class FSNamesystem implements FSC
void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
}
+
+
}
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Journal manager for the common case of edits files being written
+ * to a storage directory.
+ *
+ * Note: this class is not thread-safe and should be externally
+ * synchronized.
+ */
+class FileJournalManager implements JournalManager {
+ private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
+
+ private final StorageDirectory sd;
+ private int outputBufferCapacity = 512*1024;
+
+ public FileJournalManager(StorageDirectory sd) {
+ this.sd = sd;
+ }
+
+ @Override
+ public EditLogOutputStream startLogSegment(long txid) throws IOException {
+ File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+ EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
+ outputBufferCapacity);
+ stm.create();
+ return stm;
+ }
+
+ @Override
+ public void finalizeLogSegment(long firstTxId, long lastTxId)
+ throws IOException {
+ File inprogressFile = NNStorage.getInProgressEditsFile(
+ sd, firstTxId);
+ File dstFile = NNStorage.getFinalizedEditsFile(
+ sd, firstTxId, lastTxId);
+ LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
+
+ Preconditions.checkState(!dstFile.exists(),
+ "Can't finalize edits file " + inprogressFile + " since finalized file " +
+ "already exists");
+ if (!inprogressFile.renameTo(dstFile)) {
+ throw new IOException("Unable to finalize edits file " + inprogressFile);
+ }
+ }
+
+ @VisibleForTesting
+ public StorageDirectory getStorageDirectory() {
+ return sd;
+ }
+
+ @Override
+ public String toString() {
+ return "FileJournalManager for storage directory " + sd;
+ }
+
+ @Override
+ public void setOutputBufferCapacity(int size) {
+ this.outputBufferCapacity = size;
+ }
+
+ @Override
+ public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger)
+ throws IOException {
+ File[] files = FileUtil.listFiles(sd.getCurrentDir());
+ List<FoundEditLog> editLogs =
+ FSImageTransactionalStorageInspector.matchEditLogs(files);
+ for (FoundEditLog log : editLogs) {
+ if (log.getStartTxId() < minTxIdToKeep &&
+ log.getLastTxId() < minTxIdToKeep) {
+ purger.purgeLog(log);
+ }
+ }
+ }
+
+ @Override
+ public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
+ throws IOException {
+ File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
+ return new EditLogFileInputStream(f);
+ }
+
+}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Fri Jul 29 16:28:45 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.na
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.io.*;
+import java.net.InetSocketAddress;
+
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -34,11 +36,16 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Preconditions;
+
/**
* This class is used in Namesystem's jetty to retrieve a file.
* Typically used by the Secondary NameNode to retrieve image and
@@ -50,15 +57,21 @@ public class GetImageServlet extends Htt
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
- @SuppressWarnings("unchecked")
+ private static final String TXID_PARAM = "txid";
+ private static final String START_TXID_PARAM = "startTxId";
+ private static final String END_TXID_PARAM = "endTxId";
+ private static final String STORAGEINFO_PARAM = "storageInfo";
+
+ private static Set<Long> currentlyDownloadingCheckpoints =
+ Collections.<Long>synchronizedSet(new HashSet<Long>());
+
public void doGet(final HttpServletRequest request,
final HttpServletResponse response
) throws ServletException, IOException {
- Map<String,String[]> pmap = request.getParameterMap();
try {
ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
- final TransferFsImage ff = new TransferFsImage(pmap, request, response);
+ final GetImageParams parsedParams = new GetImageParams(request, response);
final Configuration conf =
(Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);
@@ -70,45 +83,77 @@ public class GetImageServlet extends Htt
+ request.getRemoteHost());
return;
}
-
+
+ String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
+ String theirStorageInfoString = parsedParams.getStorageInfoString();
+ if (theirStorageInfoString != null &&
+ !myStorageInfoString.equals(theirStorageInfoString)) {
+ response.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "This namenode has storage info " + myStorageInfoString +
+ " but the secondary expected " + theirStorageInfoString);
+ LOG.warn("Received an invalid request file transfer request " +
+ "from a secondary with storage info " + theirStorageInfoString);
+ return;
+ }
+
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- if (ff.getImage()) {
- response.setHeader(TransferFsImage.CONTENT_LENGTH,
- String.valueOf(nnImage.getStorage()
- .getFsImageName().length()));
+ if (parsedParams.isGetImage()) {
+ long txid = parsedParams.getTxId();
+ File imageFile = nnImage.getStorage().getFsImageName(txid);
+ if (imageFile == null) {
+ throw new IOException("Could not find image with txid " + txid);
+ }
+ setVerificationHeaders(response, imageFile);
// send fsImage
- TransferFsImage.getFileServer(response.getOutputStream(),
- nnImage.getStorage().getFsImageName(),
+ TransferFsImage.getFileServer(response.getOutputStream(), imageFile,
getThrottler(conf));
- } else if (ff.getEdit()) {
- response.setHeader(TransferFsImage.CONTENT_LENGTH,
- String.valueOf(nnImage.getStorage()
- .getFsEditName().length()));
+ } else if (parsedParams.isGetEdit()) {
+ long startTxId = parsedParams.getStartTxId();
+ long endTxId = parsedParams.getEndTxId();
+
+ File editFile = nnImage.getStorage()
+ .findFinalizedEditsFile(startTxId, endTxId);
+ setVerificationHeaders(response, editFile);
+
// send edits
- TransferFsImage.getFileServer(response.getOutputStream(),
- nnImage.getStorage().getFsEditName(),
+ TransferFsImage.getFileServer(response.getOutputStream(), editFile,
getThrottler(conf));
- } else if (ff.putImage()) {
- // issue a HTTP get request to download the new fsimage
- nnImage.validateCheckpointUpload(ff.getToken());
- nnImage.newImageDigest = ff.getNewChecksum();
- MD5Hash downloadImageDigest = reloginIfNecessary().doAs(
- new PrivilegedExceptionAction<MD5Hash>() {
- @Override
- public MD5Hash run() throws Exception {
- return TransferFsImage.getFileClient(
- ff.getInfoServer(), "getimage=1",
- nnImage.getStorage().getFsImageNameCheckpoint(), true);
- }
- });
- if (!nnImage.newImageDigest.equals(downloadImageDigest)) {
- throw new IOException("The downloaded image is corrupt," +
- " expecting a checksum " + nnImage.newImageDigest +
- " but received a checksum " + downloadImageDigest);
+ } else if (parsedParams.isPutImage()) {
+ final long txid = parsedParams.getTxId();
+
+ if (! currentlyDownloadingCheckpoints.add(txid)) {
+ throw new IOException(
+ "Another checkpointer is already in the process of uploading a" +
+ " checkpoint made at transaction ID " + txid);
+ }
+
+ try {
+ if (nnImage.getStorage().findImageFile(txid) != null) {
+ throw new IOException(
+ "Another checkpointer already uploaded an checkpoint " +
+ "for txid " + txid);
+ }
+
+ // issue a HTTP get request to download the new fsimage
+ MD5Hash downloadImageDigest = reloginIfNecessary().doAs(
+ new PrivilegedExceptionAction<MD5Hash>() {
+ @Override
+ public MD5Hash run() throws Exception {
+ return TransferFsImage.downloadImageToStorage(
+ parsedParams.getInfoServer(), txid,
+ nnImage.getStorage(), true);
+ }
+ });
+ nnImage.saveDigestAndRenameCheckpointImage(txid, downloadImageDigest);
+
+ // Now that we have a new checkpoint, we might be able to
+ // remove some old ones.
+ nnImage.purgeOldStorage();
+ } finally {
+ currentlyDownloadingCheckpoints.remove(txid);
}
- nnImage.checkpointUploadDone();
}
return null;
}
@@ -182,4 +227,148 @@ public class GetImageServlet extends Htt
if(LOG.isDebugEnabled()) LOG.debug("isValidRequestor is rejecting: " + remoteUser);
return false;
}
+
+ /**
+ * Set headers for content length, and, if available, md5.
+ * @throws IOException
+ */
+ private void setVerificationHeaders(HttpServletResponse response, File file)
+ throws IOException {
+ response.setHeader(TransferFsImage.CONTENT_LENGTH,
+ String.valueOf(file.length()));
+ MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
+ if (hash != null) {
+ response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
+ }
+ }
+
+ static String getParamStringForImage(long txid,
+ StorageInfo remoteStorageInfo) {
+ return "getimage=1&" + TXID_PARAM + "=" + txid
+ + "&" + STORAGEINFO_PARAM + "=" +
+ remoteStorageInfo.toColonSeparatedString();
+
+ }
+
+ static String getParamStringForLog(RemoteEditLog log,
+ StorageInfo remoteStorageInfo) {
+ return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId()
+ + "&" + END_TXID_PARAM + "=" + log.getEndTxId()
+ + "&" + STORAGEINFO_PARAM + "=" +
+ remoteStorageInfo.toColonSeparatedString();
+ }
+
+ static String getParamStringToPutImage(long txid,
+ InetSocketAddress imageListenAddress, NNStorage storage) {
+
+ return "putimage=1" +
+ "&" + TXID_PARAM + "=" + txid +
+ "&port=" + imageListenAddress.getPort() +
+ "&machine=" + imageListenAddress.getHostName()
+ + "&" + STORAGEINFO_PARAM + "=" +
+ storage.toColonSeparatedString();
+ }
+
+
+ static class GetImageParams {
+ private boolean isGetImage;
+ private boolean isGetEdit;
+ private boolean isPutImage;
+ private int remoteport;
+ private String machineName;
+ private long startTxId, endTxId, txId;
+ private String storageInfoString;
+
+ /**
+ * @param request the object from which this servlet reads the url contents
+ * @param response the object into which this servlet writes the url contents
+ * @throws IOException if the request is bad
+ */
+ public GetImageParams(HttpServletRequest request,
+ HttpServletResponse response
+ ) throws IOException {
+ @SuppressWarnings("unchecked")
+ Map<String, String[]> pmap = request.getParameterMap();
+ isGetImage = isGetEdit = isPutImage = false;
+ remoteport = 0;
+ machineName = null;
+
+ for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
+ String key = entry.getKey();
+ String[] val = entry.getValue();
+ if (key.equals("getimage")) {
+ isGetImage = true;
+ txId = parseLongParam(request, TXID_PARAM);
+ } else if (key.equals("getedit")) {
+ isGetEdit = true;
+ startTxId = parseLongParam(request, START_TXID_PARAM);
+ endTxId = parseLongParam(request, END_TXID_PARAM);
+ } else if (key.equals("putimage")) {
+ isPutImage = true;
+ txId = parseLongParam(request, TXID_PARAM);
+ } else if (key.equals("port")) {
+ remoteport = new Integer(val[0]).intValue();
+ } else if (key.equals("machine")) {
+ machineName = val[0];
+ } else if (key.equals(STORAGEINFO_PARAM)) {
+ storageInfoString = val[0];
+ }
+ }
+
+ int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
+ if ((numGets > 1) || (numGets == 0) && !isPutImage) {
+ throw new IOException("Illegal parameters to TransferFsImage");
+ }
+ }
+
+ public String getStorageInfoString() {
+ return storageInfoString;
+ }
+
+ public long getTxId() {
+ Preconditions.checkState(isGetImage || isPutImage);
+ return txId;
+ }
+
+ public long getStartTxId() {
+ Preconditions.checkState(isGetEdit);
+ return startTxId;
+ }
+
+ public long getEndTxId() {
+ Preconditions.checkState(isGetEdit);
+ return endTxId;
+ }
+
+ boolean isGetEdit() {
+ return isGetEdit;
+ }
+
+ boolean isGetImage() {
+ return isGetImage;
+ }
+
+ boolean isPutImage() {
+ return isPutImage;
+ }
+
+ String getInfoServer() throws IOException{
+ if (machineName == null || remoteport == 0) {
+ throw new IOException ("MachineName and port undefined");
+ }
+ return machineName + ":" + remoteport;
+ }
+
+ private static long parseLongParam(HttpServletRequest request, String param)
+ throws IOException {
+ // Parse the 'txid' parameter which indicates which image is to be
+ // fetched.
+ String paramStr = request.getParameter(param);
+ if (paramStr == null) {
+ throw new IOException("Invalid request has no " + param + " parameter");
+ }
+
+ return Long.valueOf(paramStr);
+ }
+ }
}
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
+
+/**
+ * A JournalManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+interface JournalManager {
+ /**
+ * Begin writing to a new segment of the log stream, which starts at
+ * the given transaction ID.
+ */
+ EditLogOutputStream startLogSegment(long txId) throws IOException;
+
+ /**
+ * Mark the log segment that spans from firstTxId to lastTxId
+ * as finalized and complete.
+ */
+ void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
+
+ /**
+ * Set the amount of memory that this stream should use to buffer edits
+ */
+ void setOutputBufferCapacity(int size);
+
+ /**
+ * The JournalManager may archive/purge any logs for transactions less than
+ * or equal to minImageTxId.
+ *
+ * @param minTxIdToKeep the earliest txid that must be retained after purging
+ * old logs
+ * @param purger the purging implementation to use
+ * @throws IOException if purging fails
+ */
+ void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger)
+ throws IOException;
+
+ /**
+ * @return an EditLogInputStream that reads from the same log that
+ * the edit log is currently writing. May return null if this journal
+ * manager does not support this operation.
+ */
+ EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
+ throws IOException;
+}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Fri Jul 29 16:28:45 2011
@@ -17,22 +17,20 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-
+import java.io.BufferedReader;
import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
+import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.io.OutputStream;
import java.net.URI;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -51,13 +49,17 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
-import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.DNS;
+import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* NNStorage is responsible for management of the StorageDirectories used by
* the NameNode.
@@ -66,17 +68,19 @@ import org.apache.hadoop.net.DNS;
public class NNStorage extends Storage implements Closeable {
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
- static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
-
+ static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
+
//
// The filenames used for storing the images
//
enum NameNodeFile {
IMAGE ("fsimage"),
- TIME ("fstime"),
+ TIME ("fstime"), // from "old" pre-HDFS-1073 format
+ SEEN_TXID ("seen_txid"),
EDITS ("edits"),
IMAGE_NEW ("fsimage.ckpt"),
- EDITS_NEW ("edits.new");
+ EDITS_NEW ("edits.new"), // from "old" pre-HDFS-1073 format
+ EDITS_INPROGRESS ("edits_inprogress");
private String fileName = null;
private NameNodeFile(String name) { this.fileName = name; }
@@ -106,42 +110,9 @@ public class NNStorage extends Storage i
}
}
- /**
- * Interface to be implemented by classes which make use of storage
- * directories. They are notified when a StorageDirectory is causing errors,
- * becoming available or being formatted.
- *
- * This allows the implementors of the interface take their own specific
- * action on the StorageDirectory when this occurs.
- */
- interface NNStorageListener {
- /**
- * An error has occurred with a StorageDirectory.
- * @param sd The storage directory causing the error.
- * @throws IOException
- */
- void errorOccurred(StorageDirectory sd) throws IOException;
-
- /**
- * A storage directory has been formatted.
- * @param sd The storage directory being formatted.
- * @throws IOException
- */
- void formatOccurred(StorageDirectory sd) throws IOException;
-
- /**
- * A storage directory is now available use.
- * @param sd The storage directory which has become available.
- * @throws IOException
- */
- void directoryAvailable(StorageDirectory sd) throws IOException;
- }
-
- final private List<NNStorageListener> listeners;
private UpgradeManager upgradeManager = null;
- protected MD5Hash imageDigest = null;
protected String blockpoolID = ""; // id of the block pool
-
+
/**
* flag that controls if we try to restore failed storages
*/
@@ -149,7 +120,13 @@ public class NNStorage extends Storage i
private Object restorationLock = new Object();
private boolean disablePreUpgradableLayoutCheck = false;
- private long checkpointTime = -1L; // The age of the image
+
+ /**
+ * TxId of the last transaction that was included in the most
+ * recent fsimage file. This does not include any transactions
+ * that have since been written to the edit log.
+ */
+ protected long mostRecentCheckpointTxId = FSConstants.INVALID_TXID;
/**
* list of failed (and thus removed) storages
@@ -158,27 +135,26 @@ public class NNStorage extends Storage i
= new CopyOnWriteArrayList<StorageDirectory>();
/**
- * Construct the NNStorage.
- * @param conf Namenode configuration.
+ * Properties from old layout versions that may be needed
+ * during upgrade only.
*/
- public NNStorage(Configuration conf) {
- super(NodeType.NAME_NODE);
-
- storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
- this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
- }
+ private HashMap<String, String> deprecatedProperties;
/**
* Construct the NNStorage.
- * @param storageInfo storage information
- * @param bpid block pool Id
+ * @param conf Namenode configuration.
+ * @param imageDirs Directories the image can be stored in.
+ * @param editsDirs Directories the editlog can be stored in.
+ * @throws IOException if any directories are inaccessible.
*/
- public NNStorage(StorageInfo storageInfo, String bpid) {
- super(NodeType.NAME_NODE, storageInfo);
+ public NNStorage(Configuration conf,
+ Collection<URI> imageDirs, Collection<URI> editsDirs)
+ throws IOException {
+ super(NodeType.NAME_NODE);
storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
- this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
- this.blockpoolID = bpid;
+
+ setStorageDirectories(imageDirs, editsDirs);
}
@Override // Storage
@@ -207,7 +183,6 @@ public class NNStorage extends Storage i
@Override // Closeable
public void close() throws IOException {
- listeners.clear();
unlockAll();
storageDirs.clear();
}
@@ -232,10 +207,7 @@ public class NNStorage extends Storage i
/**
* See if any of removed storages is "writable" again, and can be returned
- * into service. If saveNamespace is set, then this method is being
- * called from saveNamespace.
- *
- * @param saveNamespace Whether method is being called from saveNamespace()
+ * into service.
*/
void attemptRestoreRemovedStorage() {
// if directory is "alive" - copy the images there...
@@ -253,23 +225,10 @@ public class NNStorage extends Storage i
LOG.info("currently disabled dir " + root.getAbsolutePath() +
"; type="+sd.getStorageDirType()
+ ";canwrite="+root.canWrite());
- try {
-
- if(root.exists() && root.canWrite()) {
- // when we try to restore we just need to remove all the data
- // without saving current in-memory state (which could've changed).
- sd.clearDirectory();
-
- LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
- for (NNStorageListener listener : listeners) {
- listener.directoryAvailable(sd);
- }
-
- this.addStorageDir(sd); // restore
- this.removedStorageDirs.remove(sd);
- }
- } catch(IOException e) {
- LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e);
+ if(root.exists() && root.canWrite()) {
+ LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
+ this.addStorageDir(sd); // restore
+ this.removedStorageDirs.remove(sd);
}
}
}
@@ -283,9 +242,11 @@ public class NNStorage extends Storage i
}
/**
- * Set the storage directories which will be used. NNStorage.close() should
- * be called before this to ensure any previous storage directories have been
- * freed.
+ * Set the storage directories which will be used. This should only ever be
+ * called from inside NNStorage. However, it needs to remain package private
+ * for testing, as StorageDirectories need to be reinitialised after using
+ * Mockito.spy() on this class, as Mockito doesn't work well with inner
+ * classes, such as StorageDirectory in this case.
*
* Synchronized due to initialization of storageDirs and removedStorageDirs.
*
@@ -293,6 +254,7 @@ public class NNStorage extends Storage i
* @param fsEditsDirs Locations to store edit logs.
* @throws IOException
*/
+ @VisibleForTesting
synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
Collection<URI> fsEditsDirs)
throws IOException {
@@ -411,110 +373,84 @@ public class NNStorage extends Storage i
}
return list;
}
-
+
/**
- * Determine the checkpoint time of the specified StorageDirectory
+ * Determine the last transaction ID noted in this storage directory.
+ * This txid is stored in a special seen_txid file since it might not
+ * correspond to the latest image or edit log. For example, an image-only
+ * directory will have this txid incremented when edits logs roll, even
+ * though the edits logs are in a different directory.
*
* @param sd StorageDirectory to check
- * @return If file exists and can be read, last checkpoint time. If not, 0L.
+ * @return If file exists and can be read, last recorded txid. If not, 0L.
* @throws IOException On errors processing file pointed to by sd
*/
- long readCheckpointTime(StorageDirectory sd) throws IOException {
- File timeFile = getStorageFile(sd, NameNodeFile.TIME);
- long timeStamp = 0L;
- if (timeFile.exists() && timeFile.canRead()) {
- DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+ static long readTransactionIdFile(StorageDirectory sd) throws IOException {
+ File txidFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
+ long txid = 0L;
+ if (txidFile.exists() && txidFile.canRead()) {
+ BufferedReader br = new BufferedReader(new FileReader(txidFile));
try {
- timeStamp = in.readLong();
+ txid = Long.valueOf(br.readLine());
} finally {
- in.close();
+ IOUtils.cleanup(LOG, br);
}
}
- return timeStamp;
+ return txid;
}
-
+
/**
* Write last checkpoint time into a separate file.
*
* @param sd
* @throws IOException
*/
- public void writeCheckpointTime(StorageDirectory sd) throws IOException {
- if (checkpointTime < 0L)
- return; // do not write negative time
- File timeFile = getStorageFile(sd, NameNodeFile.TIME);
- if (timeFile.exists() && ! timeFile.delete()) {
- LOG.error("Cannot delete chekpoint time file: "
- + timeFile.getCanonicalPath());
- }
- FileOutputStream fos = new FileOutputStream(timeFile);
- DataOutputStream out = new DataOutputStream(fos);
+ void writeTransactionIdFile(StorageDirectory sd, long txid) throws IOException {
+ Preconditions.checkArgument(txid >= 0, "bad txid: " + txid);
+
+ File txIdFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
+ OutputStream fos = new AtomicFileOutputStream(txIdFile);
try {
- out.writeLong(checkpointTime);
- out.flush();
- fos.getChannel().force(true);
+ fos.write(String.valueOf(txid).getBytes());
+ fos.write('\n');
} finally {
- out.close();
+ IOUtils.cleanup(LOG, fos);
}
}
/**
- * Record new checkpoint time in order to
- * distinguish healthy directories from the removed ones.
- * If there is an error writing new checkpoint time, the corresponding
- * storage directory is removed from the list.
+ * Set the transaction ID of the last checkpoint
*/
- public void incrementCheckpointTime() {
- setCheckpointTimeInStorage(checkpointTime + 1);
+ void setMostRecentCheckpointTxId(long txid) {
+ this.mostRecentCheckpointTxId = txid;
}
/**
- * The age of the namespace state.<p>
- * Reflects the latest time the image was saved.
- * Modified with every save or a checkpoint.
- * Persisted in VERSION file.
- *
- * @return the current checkpoint time.
+ * Return the transaction ID of the last checkpoint.
*/
- public long getCheckpointTime() {
- return checkpointTime;
+ long getMostRecentCheckpointTxId() {
+ return mostRecentCheckpointTxId;
}
/**
- * Set the checkpoint time.
- *
- * This method does not persist the checkpoint time to storage immediately.
+ * Write a small file in all available storage directories that
+ * indicates that the namespace has reached some given transaction ID.
*
- * @see #setCheckpointTimeInStorage
- * @param newCpT the new checkpoint time.
- */
- public void setCheckpointTime(long newCpT) {
- checkpointTime = newCpT;
- }
-
- /**
- * Set the current checkpoint time. Writes the new checkpoint
- * time to all available storage directories.
- * @param newCpT The new checkpoint time.
- */
- public void setCheckpointTimeInStorage(long newCpT) {
- checkpointTime = newCpT;
- // Write new checkpoint time in all storage directories
- for(Iterator<StorageDirectory> it =
- dirIterator(); it.hasNext();) {
- StorageDirectory sd = it.next();
+ * This is used when the image is loaded to avoid accidental rollbacks
+ * in the case where an edit log is fully deleted but there is no
+ * checkpoint. See TestNameEditsConfigs.testNameEditsConfigsFailure()
+ * @param txid the txid that has been reached
+ */
+ public void writeTransactionIdFileToStorage(long txid) {
+ // Write txid marker in all storage directories
+ for (StorageDirectory sd : storageDirs) {
try {
- writeCheckpointTime(sd);
+ writeTransactionIdFile(sd, txid);
} catch(IOException e) {
// Close any edits stream associated with this dir and remove directory
- LOG.warn("incrementCheckpointTime failed on "
- + sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
- try {
- reportErrorsOnDirectory(sd);
- } catch (IOException ioe) {
- LOG.error("Failed to report and remove NN storage directory "
- + sd.getRoot().getPath(), ioe);
- }
+ LOG.warn("writeTransactionIdToStorage failed on " + sd,
+ e);
+ reportErrorsOnDirectory(sd);
}
}
}
@@ -525,11 +461,11 @@ public class NNStorage extends Storage i
*
* @return List of filenames to save checkpoints to.
*/
- public File[] getFsImageNameCheckpoint() {
+ public File[] getFsImageNameCheckpoint(long txid) {
ArrayList<File> list = new ArrayList<File>();
for (Iterator<StorageDirectory> it =
dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
- list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW));
+ list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW, txid));
}
return list.toArray(new File[list.size()]);
}
@@ -538,51 +474,24 @@ public class NNStorage extends Storage i
* Return the name of the image file.
* @return The name of the first image file.
*/
- public File getFsImageName() {
+ public File getFsImageName(long txid) {
StorageDirectory sd = null;
for (Iterator<StorageDirectory> it =
dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
sd = it.next();
- File fsImage = getStorageFile(sd, NameNodeFile.IMAGE);
+ File fsImage = getStorageFile(sd, NameNodeFile.IMAGE, txid);
if(sd.getRoot().canRead() && fsImage.exists())
return fsImage;
}
return null;
}
- /**
- * @return The name of the first editlog file.
- */
- public File getFsEditName() throws IOException {
- for (Iterator<StorageDirectory> it
- = dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
- StorageDirectory sd = it.next();
- if(sd.getRoot().canRead())
- return getEditFile(sd);
- }
- return null;
- }
-
- /**
- * @return The name of the first time file.
- */
- public File getFsTimeName() {
- StorageDirectory sd = null;
- // NameNodeFile.TIME shoul be same on all directories
- for (Iterator<StorageDirectory> it =
- dirIterator(); it.hasNext();)
- sd = it.next();
- return getStorageFile(sd, NameNodeFile.TIME);
- }
-
/** Create new dfs name directory. Caution: this destroys all files
* in this filesystem. */
private void format(StorageDirectory sd) throws IOException {
sd.clearDirectory(); // create currrent dir
- for (NNStorageListener listener : listeners) {
- listener.formatOccurred(sd);
- }
writeProperties(sd);
+ writeTransactionIdFile(sd, 0);
LOG.info("Storage directory " + sd.getRoot()
+ " has been successfully formatted.");
@@ -597,7 +506,6 @@ public class NNStorage extends Storage i
this.clusterID = clusterId;
this.blockpoolID = newBlockPoolID();
this.cTime = 0L;
- this.setCheckpointTime(now());
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@@ -624,50 +532,6 @@ public class NNStorage extends Storage i
return newID;
}
-
- /**
- * Move {@code current} to {@code lastcheckpoint.tmp} and
- * recreate empty {@code current}.
- * {@code current} is moved only if it is well formatted,
- * that is contains VERSION file.
- *
- * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
- * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
- */
- protected void moveCurrent(StorageDirectory sd)
- throws IOException {
- File curDir = sd.getCurrentDir();
- File tmpCkptDir = sd.getLastCheckpointTmp();
- // mv current -> lastcheckpoint.tmp
- // only if current is formatted - has VERSION file
- if(sd.getVersionFile().exists()) {
- assert curDir.exists() : curDir + " directory must exist.";
- assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist.";
- rename(curDir, tmpCkptDir);
- }
- // recreate current
- if(!curDir.exists() && !curDir.mkdir())
- throw new IOException("Cannot create directory " + curDir);
- }
-
- /**
- * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint}
- *
- * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
- * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
- */
- protected void moveLastCheckpoint(StorageDirectory sd)
- throws IOException {
- File tmpCkptDir = sd.getLastCheckpointTmp();
- File prevCkptDir = sd.getPreviousCheckpoint();
- // remove previous.checkpoint
- if (prevCkptDir.exists())
- deleteDir(prevCkptDir);
- // mv lastcheckpoint.tmp -> previous.checkpoint
- if(tmpCkptDir.exists())
- rename(tmpCkptDir, prevCkptDir);
- }
-
@Override // Storage
protected void setFieldsFromProperties(
Properties props, StorageDirectory sd) throws IOException {
@@ -689,26 +553,35 @@ public class NNStorage extends Storage i
setDistributedUpgradeState(
sDUS == null? false : Boolean.parseBoolean(sDUS),
sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
+ setDeprecatedPropertiesForUpgrade(props);
+ }
- String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
- if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
- if (sMd5 == null) {
- throw new InconsistentFSStateException(sd.getRoot(),
- "file " + STORAGE_FILE_VERSION
- + " does not have MD5 image digest.");
- }
- this.imageDigest = new MD5Hash(sMd5);
- } else if (sMd5 != null) {
- throw new InconsistentFSStateException(sd.getRoot(),
- "file " + STORAGE_FILE_VERSION +
- " has image MD5 digest when version is " + layoutVersion);
+ /**
+ * Pull any properties out of the VERSION file that are from older
+ * versions of HDFS and only necessary during upgrade.
+ */
+ private void setDeprecatedPropertiesForUpgrade(Properties props) {
+ deprecatedProperties = new HashMap<String, String>();
+ String md5 = props.getProperty(DEPRECATED_MESSAGE_DIGEST_PROPERTY);
+ if (md5 != null) {
+ deprecatedProperties.put(DEPRECATED_MESSAGE_DIGEST_PROPERTY, md5);
}
-
- this.setCheckpointTime(readCheckpointTime(sd));
+ }
+
+ /**
+ * Return a property that was stored in an earlier version of HDFS.
+ *
+ * This should only be used during upgrades.
+ */
+ String getDeprecatedProperty(String prop) {
+ assert getLayoutVersion() > FSConstants.LAYOUT_VERSION :
+ "getDeprecatedProperty should only be done when loading " +
+ "storage from past versions during upgrade.";
+ return deprecatedProperties.get(prop);
}
/**
- * Write last checkpoint time and version file into the storage directory.
+ * Write version file into the storage directory.
*
* The version file should always be written last.
* Missing or corrupted version file indicates that
@@ -733,50 +606,109 @@ public class NNStorage extends Storage i
props.setProperty("distributedUpgradeVersion",
Integer.toString(uVersion));
}
- if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
- // Though the current NN supports this feature, this function
- // is called with old layoutVersions from the upgrade tests.
- if (imageDigest == null) {
- // May be null on the first save after an upgrade.
- imageDigest = MD5Hash.digest(
- new FileInputStream(getStorageFile(sd, NameNodeFile.IMAGE)));
- }
- props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
- }
-
- writeCheckpointTime(sd);
}
-
+
+ static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
+ return new File(sd.getCurrentDir(),
+ String.format("%s_%019d", type.getName(), imageTxId));
+ }
+
/**
- * @return A File of 'type' in storage directory 'sd'.
+ * Get a storage file for one of the files that doesn't need a txid associated
+ * (e.g version, seen_txid)
*/
static File getStorageFile(StorageDirectory sd, NameNodeFile type) {
return new File(sd.getCurrentDir(), type.getName());
}
+ @VisibleForTesting
+ public static String getCheckpointImageFileName(long txid) {
+ return String.format("%s_%019d",
+ NameNodeFile.IMAGE_NEW.getName(), txid);
+ }
+
+ @VisibleForTesting
+ public static String getImageFileName(long txid) {
+ return String.format("%s_%019d",
+ NameNodeFile.IMAGE.getName(), txid);
+ }
+
+ @VisibleForTesting
+ public static String getInProgressEditsFileName(long startTxId) {
+ return String.format("%s_%019d", NameNodeFile.EDITS_INPROGRESS.getName(),
+ startTxId);
+ }
+
+ static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {
+ return new File(sd.getCurrentDir(), getInProgressEditsFileName(startTxId));
+ }
+
+ static File getFinalizedEditsFile(StorageDirectory sd,
+ long startTxId, long endTxId) {
+ return new File(sd.getCurrentDir(),
+ getFinalizedEditsFileName(startTxId, endTxId));
+ }
+
+ static File getImageFile(StorageDirectory sd, long txid) {
+ return new File(sd.getCurrentDir(),
+ getImageFileName(txid));
+ }
+
+ @VisibleForTesting
+ public static String getFinalizedEditsFileName(long startTxId, long endTxId) {
+ return String.format("%s_%019d-%019d", NameNodeFile.EDITS.getName(),
+ startTxId, endTxId);
+ }
+
/**
- * @return A editlog File in storage directory 'sd'.
+ * Return the first readable finalized edits file for the given txid.
*/
- File getEditFile(StorageDirectory sd) {
- return getStorageFile(sd, NameNodeFile.EDITS);
+ File findFinalizedEditsFile(long startTxId, long endTxId)
+ throws IOException {
+ File ret = findFile(NameNodeDirType.EDITS,
+ getFinalizedEditsFileName(startTxId, endTxId));
+ if (ret == null) {
+ throw new IOException(
+ "No edits file for txid " + startTxId + "-" + endTxId + " exists!");
+ }
+ return ret;
+ }
+
+ /**
+ * Return the first readable image file for the given txid, or null
+ * if no such image can be found
+ */
+ File findImageFile(long txid) throws IOException {
+ return findFile(NameNodeDirType.IMAGE,
+ getImageFileName(txid));
}
/**
- * @return A temporary editlog File in storage directory 'sd'.
+ * Return the first readable storage file of the given name
+ * across any of the 'current' directories in SDs of the
+ * given type, or null if no such file exists.
*/
- File getEditNewFile(StorageDirectory sd) {
- return getStorageFile(sd, NameNodeFile.EDITS_NEW);
+ private File findFile(NameNodeDirType dirType, String name) {
+ for (StorageDirectory sd : dirIterable(dirType)) {
+ File candidate = new File(sd.getCurrentDir(), name);
+ if (sd.getCurrentDir().canRead() &&
+ candidate.exists()) {
+ return candidate;
+ }
+ }
+ return null;
}
/**
- * @return A list of all Files of 'type' in available storage directories.
+ * @return A list of the given File in every available storage directory,
+ * regardless of whether it might exist.
*/
- Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
+ List<File> getFiles(NameNodeDirType dirType, String fileName) {
ArrayList<File> list = new ArrayList<File>();
Iterator<StorageDirectory> it =
(dirType == null) ? dirIterator() : dirIterator(dirType);
for ( ;it.hasNext(); ) {
- list.add(getStorageFile(it.next(), type));
+ list.add(new File(it.next().getCurrentDir(), fileName));
}
return list;
}
@@ -809,7 +741,9 @@ public class NNStorage extends Storage i
* @param uVersion the new version.
*/
private void setDistributedUpgradeState(boolean uState, int uVersion) {
- upgradeManager.setUpgradeState(uState, uVersion);
+ if (upgradeManager != null) {
+ upgradeManager.setUpgradeState(uState, uVersion);
+ }
}
/**
@@ -850,33 +784,6 @@ public class NNStorage extends Storage i
}
/**
- * Set the digest for the latest image stored by NNStorage.
- * @param digest The digest for the image.
- */
- void setImageDigest(MD5Hash digest) {
- this.imageDigest = digest;
- }
-
- /**
- * Get the digest for the latest image storage by NNStorage.
- * @return The digest for the latest image.
- */
- MD5Hash getImageDigest() {
- return imageDigest;
- }
-
- /**
- * Register a listener. The listener will be notified of changes to the list
- * of available storage directories.
- *
- * @see NNStorageListener
- * @param sel A storage listener.
- */
- void registerListener(NNStorageListener sel) {
- listeners.add(sel);
- }
-
- /**
* Disable the check for pre-upgradable layouts. Needed for BackupImage.
* @param val Whether to disable the preupgradeable layout check.
*/
@@ -890,7 +797,7 @@ public class NNStorage extends Storage i
* @param sds A list of storage directories to mark as errored.
* @throws IOException
*/
- void reportErrorsOnDirectories(List<StorageDirectory> sds) throws IOException {
+ void reportErrorsOnDirectories(List<StorageDirectory> sds) {
for (StorageDirectory sd : sds) {
reportErrorsOnDirectory(sd);
}
@@ -904,17 +811,12 @@ public class NNStorage extends Storage i
* @param sd A storage directory to mark as errored.
* @throws IOException
*/
- void reportErrorsOnDirectory(StorageDirectory sd)
- throws IOException {
+ void reportErrorsOnDirectory(StorageDirectory sd) {
LOG.error("Error reported on storage directory " + sd);
String lsd = listStorageDirectories();
LOG.debug("current list of storage dirs:" + lsd);
- for (NNStorageListener listener : listeners) {
- listener.errorOccurred(sd);
- }
-
LOG.warn("About to remove corresponding storage: "
+ sd.getRoot().getAbsolutePath());
try {
@@ -927,8 +829,7 @@ public class NNStorage extends Storage i
if (this.storageDirs.remove(sd)) {
this.removedStorageDirs.add(sd);
}
- incrementCheckpointTime();
-
+
lsd = listStorageDirectories();
LOG.debug("at the end current list of storage dirs:" + lsd);
}
@@ -968,6 +869,29 @@ public class NNStorage extends Storage i
}
/**
+ * Report that an IOE has occurred on some file which may
+ * or may not be within one of the NN image storage directories.
+ */
+ void reportErrorOnFile(File f) {
+ // We use getAbsolutePath here instead of getCanonicalPath since we know
+ // that there is some IO problem on that drive.
+ // getCanonicalPath may need to call stat() or readlink() and it's likely
+ // those calls would fail due to the same underlying IO problem.
+ String absPath = f.getAbsolutePath();
+ for (StorageDirectory sd : storageDirs) {
+ String dirPath = sd.getRoot().getAbsolutePath();
+ if (!dirPath.endsWith("/")) {
+ dirPath += "/";
+ }
+ if (absPath.startsWith(dirPath)) {
+ reportErrorsOnDirectory(sd);
+ return;
+ }
+ }
+
+ }
+
+ /**
* Generate new clusterID.
*
* clusterID is a persistent attribute of the cluster.
@@ -1065,4 +989,67 @@ public class NNStorage extends Storage i
public String getBlockPoolID() {
return blockpoolID;
}
+
+ /**
+ * Iterate over all current storage directories, inspecting them
+ * with the given inspector.
+ */
+ void inspectStorageDirs(FSImageStorageInspector inspector)
+ throws IOException {
+
+ // Process each of the storage directories to find the pair of
+ // newest image file and edit file
+ for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ inspector.inspectDirectory(sd);
+ }
+ }
+
+ /**
+ * Iterate over all of the storage dirs, reading their contents to determine
+ * their layout versions. Returns an FSImageStorageInspector which has
+ * inspected each directory.
+ *
+ * <b>Note:</b> this can mutate the storage info fields (ctime, version, etc).
+ * @throws IOException if no valid storage dirs are found
+ */
+ FSImageStorageInspector readAndInspectDirs()
+ throws IOException {
+ int minLayoutVersion = Integer.MAX_VALUE; // the newest
+ int maxLayoutVersion = Integer.MIN_VALUE; // the oldest
+
+ // First determine what range of layout versions we're going to inspect
+ for (Iterator<StorageDirectory> it = dirIterator();
+ it.hasNext();) {
+ StorageDirectory sd = it.next();
+ if (!sd.getVersionFile().exists()) {
+ FSImage.LOG.warn("Storage directory " + sd + " contains no VERSION file. Skipping...");
+ continue;
+ }
+ readProperties(sd); // sets layoutVersion
+ minLayoutVersion = Math.min(minLayoutVersion, getLayoutVersion());
+ maxLayoutVersion = Math.max(maxLayoutVersion, getLayoutVersion());
+ }
+
+ if (minLayoutVersion > maxLayoutVersion) {
+ throw new IOException("No storage directories contained VERSION information");
+ }
+ assert minLayoutVersion <= maxLayoutVersion;
+
+ // If we have any storage directories with the new layout version
+ // (ie edits_<txnid>) then use the new inspector, which will ignore
+ // the old format dirs.
+ FSImageStorageInspector inspector;
+ if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, minLayoutVersion)) {
+ inspector = new FSImageTransactionalStorageInspector();
+ if (!LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, maxLayoutVersion)) {
+ FSImage.LOG.warn("Ignoring one or more storage directories with old layouts");
+ }
+ } else {
+ inspector = new FSImagePreTransactionalStorageInspector();
+ }
+
+ inspectStorageDirs(inspector);
+ return inspector;
+ }
}
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * The NNStorageRetentionManager is responsible for inspecting the storage
+ * directories of the NN and enforcing a retention policy on checkpoints
+ * and edit logs.
+ *
+ * It delegates the actual removal of files to a StoragePurger
+ * implementation, which might delete the files or instead copy them to
+ * a filer or HDFS for later analysis.
+ */
+public class NNStorageRetentionManager {
+
+ private final int numCheckpointsToRetain;
+ private static final Log LOG = LogFactory.getLog(
+ NNStorageRetentionManager.class);
+ private final NNStorage storage;
+ private final StoragePurger purger;
+ private final FSEditLog editLog;
+
+ public NNStorageRetentionManager(
+ Configuration conf,
+ NNStorage storage,
+ FSEditLog editLog,
+ StoragePurger purger) {
+ this.numCheckpointsToRetain = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY,
+ DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT);
+ this.storage = storage;
+ this.editLog = editLog;
+ this.purger = purger;
+ }
+
+ public NNStorageRetentionManager(Configuration conf, NNStorage storage,
+ FSEditLog editLog) {
+ this(conf, storage, editLog, new DeletionStoragePurger());
+ }
+
+ public void purgeOldStorage() throws IOException {
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+ storage.inspectStorageDirs(inspector);
+
+ long minImageTxId = getImageTxIdToRetain(inspector);
+ purgeCheckpointsOlderThan(inspector, minImageTxId);
+ // If fsimage_N is the image we want to keep, then we need to keep
+ // all txns > N. We can remove anything < N+1, since fsimage_N
+ // reflects the state up to and including N.
+ editLog.purgeLogsOlderThan(minImageTxId + 1, purger);
+ }
+
+ private void purgeCheckpointsOlderThan(
+ FSImageTransactionalStorageInspector inspector,
+ long minTxId) {
+ for (FoundFSImage image : inspector.getFoundImages()) {
+ if (image.getTxId() < minTxId) {
+ LOG.info("Purging old image " + image);
+ purger.purgeImage(image);
+ }
+ }
+ }
+
+ /**
+ * @param inspector inspector that has already inspected all storage dirs
+ * @return the transaction ID corresponding to the oldest checkpoint
+ * that should be retained.
+ */
+ private long getImageTxIdToRetain(FSImageTransactionalStorageInspector inspector) {
+
+ List<FoundFSImage> images = inspector.getFoundImages();
+ TreeSet<Long> imageTxIds = Sets.newTreeSet();
+ for (FoundFSImage image : images) {
+ imageTxIds.add(image.getTxId());
+ }
+
+ List<Long> imageTxIdsList = Lists.newArrayList(imageTxIds);
+ if (imageTxIdsList.isEmpty()) {
+ return 0;
+ }
+
+ Collections.reverse(imageTxIdsList);
+ int toRetain = Math.min(numCheckpointsToRetain, imageTxIdsList.size());
+ long minTxId = imageTxIdsList.get(toRetain - 1);
+ LOG.info("Going to retain " + toRetain + " images with txid >= " +
+ minTxId);
+ return minTxId;
+ }
+
+ /**
+ * Interface responsible for disposing of old checkpoints and edit logs.
+ */
+ static interface StoragePurger {
+ void purgeLog(FoundEditLog log);
+ void purgeImage(FoundFSImage image);
+ }
+
+ static class DeletionStoragePurger implements StoragePurger {
+ @Override
+ public void purgeLog(FoundEditLog log) {
+ deleteOrWarn(log.getFile());
+ }
+
+ @Override
+ public void purgeImage(FoundFSImage image) {
+ deleteOrWarn(image.getFile());
+ deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
+ }
+
+ private static void deleteOrWarn(File file) {
+ if (!file.delete()) {
+ // It's OK if we fail to delete something -- we'll catch it
+ // next time we swing through this directory.
+ LOG.warn("Could not delete " + file);
+ }
+ }
+ }
+}