You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/02/27 00:32:14 UTC
svn commit: r1293964 [7/11] - in
/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs:
./ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/...
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sun Feb 26 23:32:06 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -35,11 +36,13 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -1069,6 +1072,112 @@ public class FSEditLog {
}
/**
+ * Find the best editlog input stream to read from txid. In this case
+ * best means the editlog which has the largest continuous range of
+ * transactions starting from the transaction id, fromTxId.
+ *
+ * If a journal throws an CorruptionException while reading from a txn id,
+ * it means that it has more transactions, but can't find any from fromTxId.
+ * If this is the case and no other journal has transactions, we should throw
+ * an exception as it means more transactions exist, we just can't load them.
+ *
+ * @param fromTxId Transaction id to start from.
+ * @return a edit log input stream with tranactions fromTxId
+ * or null if no more exist
+ */
+ private EditLogInputStream selectStream(long fromTxId)
+ throws IOException {
+ JournalManager bestjm = null;
+ long bestjmNumTxns = 0;
+ CorruptionException corruption = null;
+
+ for (JournalAndStream jas : journals) {
+ JournalManager candidate = jas.getManager();
+ long candidateNumTxns = 0;
+ try {
+ candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
+ } catch (CorruptionException ce) {
+ corruption = ce;
+ } catch (IOException ioe) {
+ LOG.warn("Error reading number of transactions from " + candidate);
+ continue; // error reading disk, just skip
+ }
+
+ if (candidateNumTxns > bestjmNumTxns) {
+ bestjm = candidate;
+ bestjmNumTxns = candidateNumTxns;
+ }
+ }
+
+
+ if (bestjm == null) {
+ /**
+ * If all candidates either threw a CorruptionException or
+ * found 0 transactions, then a gap exists.
+ */
+ if (corruption != null) {
+ throw new IOException("Gap exists in logs from "
+ + fromTxId, corruption);
+ } else {
+ return null;
+ }
+ }
+
+ return bestjm.getInputStream(fromTxId);
+ }
+
+ /**
+ * Run recovery on all journals to recover any unclosed segments
+ */
+ void recoverUnclosedStreams() {
+ mapJournalsAndReportErrors(new JournalClosure() {
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.manager.recoverUnfinalizedSegments();
+ }
+ }, "recovering unclosed streams");
+ }
+
+ /**
+ * Select a list of input streams to load.
+ * @param fromTxId first transaction in the selected streams
+ * @param toAtLeast the selected streams must contain this transaction
+ */
+ Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId)
+ throws IOException {
+ List<EditLogInputStream> streams = Lists.newArrayList();
+
+ boolean gapFound = false;
+ EditLogInputStream stream = selectStream(fromTxId);
+ while (stream != null) {
+ fromTxId = stream.getLastTxId() + 1;
+ streams.add(stream);
+ try {
+ stream = selectStream(fromTxId);
+ } catch (IOException ioe) {
+ gapFound = true;
+ break;
+ }
+ }
+ if (fromTxId <= toAtLeastTxId || gapFound) {
+ closeAllStreams(streams);
+ throw new IOException("No non-corrupt logs for txid "
+ + fromTxId);
+ }
+ return streams;
+ }
+
+ /**
+ * Close all the streams in a collection
+ * @param streams The list of streams to close
+ */
+ static void closeAllStreams(Iterable<EditLogInputStream> streams) {
+ for (EditLogInputStream s : streams) {
+ IOUtils.closeStream(s);
+ }
+ }
+
+ /**
* Container for a JournalManager paired with its currently
* active stream.
*
@@ -1137,30 +1246,5 @@ public class FSEditLog {
JournalManager getManager() {
return manager;
}
-
- private EditLogInputStream getInProgressInputStream() throws IOException {
- return manager.getInProgressInputStream(segmentStartsAtTxId);
- }
- }
-
- /**
- * @return an EditLogInputStream that reads from the same log that
- * the edit log is currently writing. This is used from the BackupNode
- * during edits synchronization.
- * @throws IOException if no valid logs are available.
- */
- synchronized EditLogInputStream getInProgressFileInputStream()
- throws IOException {
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
- try {
- EditLogInputStream in = jas.getInProgressInputStream();
- if (in != null) return in;
- } catch (IOException ioe) {
- LOG.warn("Unable to get the in-progress input stream from " + jas,
- ioe);
- }
- }
- throw new IOException("No in-progress stream provided edits");
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Sun Feb 26 23:32:06 2012
@@ -467,24 +467,6 @@ public class FSEditLogLoader {
}
}
- static EditLogValidation validateEditLog(File file) throws IOException {
- EditLogFileInputStream in;
- try {
- in = new EditLogFileInputStream(file);
- } catch (LogHeaderCorruptException corrupt) {
- // If it's missing its header, this is equivalent to no transactions
- FSImage.LOG.warn("Log at " + file + " has no valid header",
- corrupt);
- return new EditLogValidation(0, 0);
- }
-
- try {
- return validateEditLog(in);
- } finally {
- IOUtils.closeStream(in);
- }
- }
-
/**
* Return the number of valid transactions in the stream. If the stream is
* truncated during the header, returns a value indicating that there are
@@ -494,12 +476,26 @@ public class FSEditLogLoader {
* if the log does not exist)
*/
static EditLogValidation validateEditLog(EditLogInputStream in) {
- long numValid = 0;
long lastPos = 0;
+ long firstTxId = HdfsConstants.INVALID_TXID;
+ long lastTxId = HdfsConstants.INVALID_TXID;
+ long numValid = 0;
try {
+ FSEditLogOp op = null;
while (true) {
lastPos = in.getPosition();
- if (in.readOp() == null) {
+ if ((op = in.readOp()) == null) {
+ break;
+ }
+ if (firstTxId == HdfsConstants.INVALID_TXID) {
+ firstTxId = op.txid;
+ }
+ if (lastTxId == HdfsConstants.INVALID_TXID
+ || op.txid == lastTxId + 1) {
+ lastTxId = op.txid;
+ } else {
+ FSImage.LOG.error("Out of order txid found. Found " + op.txid
+ + ", expected " + (lastTxId + 1));
break;
}
numValid++;
@@ -510,16 +506,33 @@ public class FSEditLogLoader {
FSImage.LOG.debug("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length.", t);
}
- return new EditLogValidation(lastPos, numValid);
+ return new EditLogValidation(lastPos, firstTxId, lastTxId);
}
static class EditLogValidation {
- long validLength;
- long numTransactions;
-
- EditLogValidation(long validLength, long numTransactions) {
+ private long validLength;
+ private long startTxId;
+ private long endTxId;
+
+ EditLogValidation(long validLength,
+ long startTxId, long endTxId) {
this.validLength = validLength;
- this.numTransactions = numTransactions;
+ this.startTxId = startTxId;
+ this.endTxId = endTxId;
+ }
+
+ long getValidLength() { return validLength; }
+
+ long getStartTxId() { return startTxId; }
+
+ long getEndTxId() { return endTxId; }
+
+ long getNumTransactions() {
+ if (endTxId == HdfsConstants.INVALID_TXID
+ || startTxId == HdfsConstants.INVALID_TXID) {
+ return 0;
+ }
+ return (endTxId - startTxId) + 1;
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sun Feb 26 23:32:06 2012
@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.com
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
+
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -70,7 +70,6 @@ import com.google.common.collect.Lists;
public class FSImage implements Closeable {
protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
- protected FSNamesystem namesystem = null;
protected FSEditLog editLog = null;
private boolean isUpgradeFinalized = false;
@@ -82,38 +81,20 @@ public class FSImage implements Closeabl
*/
protected long lastAppliedTxId = 0;
- /**
- * URIs for importing an image from a checkpoint. In the default case,
- * URIs will represent directories.
- */
- private Collection<URI> checkpointDirs;
- private Collection<URI> checkpointEditsDirs;
-
final private Configuration conf;
private final NNStorageRetentionManager archivalManager;
- /**
- * Construct an FSImage.
- * @param conf Configuration
- * @see #FSImage(Configuration conf, FSNamesystem ns,
- * Collection imageDirs, Collection editsDirs)
- * @throws IOException if default directories are invalid.
- */
- public FSImage(Configuration conf) throws IOException {
- this(conf, (FSNamesystem)null);
- }
/**
* Construct an FSImage
* @param conf Configuration
- * @param ns The FSNamesystem using this image.
- * @see #FSImage(Configuration conf, FSNamesystem ns,
+ * @see #FSImage(Configuration conf,
* Collection imageDirs, Collection editsDirs)
* @throws IOException if default directories are invalid.
*/
- private FSImage(Configuration conf, FSNamesystem ns) throws IOException {
- this(conf, ns,
+ protected FSImage(Configuration conf) throws IOException {
+ this(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
}
@@ -124,17 +105,14 @@ public class FSImage implements Closeabl
* Setup storage and initialize the edit log.
*
* @param conf Configuration
- * @param ns The FSNamesystem using this image.
* @param imageDirs Directories the image can be stored in.
* @param editsDirs Directories the editlog can be stored in.
* @throws IOException if directories are invalid.
*/
- protected FSImage(Configuration conf, FSNamesystem ns,
+ protected FSImage(Configuration conf,
Collection<URI> imageDirs, Collection<URI> editsDirs)
throws IOException {
this.conf = conf;
- setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
- FSImage.getCheckpointEditsDirs(conf, null));
storage = new NNStorage(conf, imageDirs, editsDirs);
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
@@ -143,31 +121,18 @@ public class FSImage implements Closeabl
}
this.editLog = new FSEditLog(storage);
- setFSNamesystem(ns);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
}
-
- protected FSNamesystem getFSNamesystem() {
- return namesystem;
- }
-
- void setFSNamesystem(FSNamesystem ns) {
- namesystem = ns;
- if (ns != null) {
- storage.setUpgradeManager(ns.upgradeManager);
- }
- }
- void setCheckpointDirectories(Collection<URI> dirs,
- Collection<URI> editsDirs) {
- checkpointDirs = dirs;
- checkpointEditsDirs = editsDirs;
- }
-
- void format(String clusterId) throws IOException {
+ void format(FSNamesystem fsn, String clusterId) throws IOException {
+ long fileCount = fsn.getTotalFiles();
+ // Expect 1 file, which is the root inode
+ Preconditions.checkState(fileCount == 1,
+ "FSImage.format should be called with an uninitialized namesystem, has " +
+ fileCount + " files");
storage.format(clusterId);
- saveFSImageInAllDirs(0);
+ saveFSImageInAllDirs(fsn, 0);
}
/**
@@ -179,7 +144,7 @@ public class FSImage implements Closeabl
* @throws IOException
* @return true if the image needs to be saved or false otherwise
*/
- boolean recoverTransitionRead(StartupOption startOpt)
+ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
@@ -187,21 +152,14 @@ public class FSImage implements Closeabl
Collection<URI> imageDirs = storage.getImageDirectories();
Collection<URI> editsDirs = storage.getEditsDirectories();
+
// none of the data dirs exist
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.IMPORT)
throw new IOException(
"All specified directories are not accessible or do not exist.");
- if(startOpt == StartupOption.IMPORT
- && (checkpointDirs == null || checkpointDirs.isEmpty()))
- throw new IOException("Cannot import image from a checkpoint. "
- + "\"dfs.namenode.checkpoint.dir\" is not set." );
-
- if(startOpt == StartupOption.IMPORT
- && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
- throw new IOException("Cannot import image from a checkpoint. "
- + "\"dfs.namenode.checkpoint.dir\" is not set." );
+ storage.setUpgradeManager(target.upgradeManager);
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
@@ -261,10 +219,10 @@ public class FSImage implements Closeabl
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
- doUpgrade();
+ doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
- doImportCheckpoint();
+ doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
doRollback();
@@ -273,7 +231,7 @@ public class FSImage implements Closeabl
// just load the image
}
- return loadFSImage();
+ return loadFSImage(target);
}
/**
@@ -324,11 +282,11 @@ public class FSImage implements Closeabl
return isFormatted;
}
- private void doUpgrade() throws IOException {
+ private void doUpgrade(FSNamesystem target) throws IOException {
if(storage.getDistributedUpgradeState()) {
// only distributed upgrade need to continue
// don't do version upgrade
- this.loadFSImage();
+ this.loadFSImage(target);
storage.initializeDistributedUpgrade();
return;
}
@@ -343,7 +301,7 @@ public class FSImage implements Closeabl
}
// load the latest image
- this.loadFSImage();
+ this.loadFSImage(target);
// Do upgrade for each directory
long oldCTime = storage.getCTime();
@@ -385,7 +343,7 @@ public class FSImage implements Closeabl
storage.reportErrorsOnDirectories(errorSDs);
errorSDs.clear();
- saveFSImageInAllDirs(editLog.getLastWrittenTxId());
+ saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@@ -422,7 +380,7 @@ public class FSImage implements Closeabl
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
boolean canRollback = false;
- FSImage prevState = new FSImage(conf, getFSNamesystem());
+ FSImage prevState = new FSImage(conf);
prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@@ -504,19 +462,32 @@ public class FSImage implements Closeabl
/**
* Load image from a checkpoint directory and save it into the current one.
+ * @param target the NameSystem to import into
* @throws IOException
*/
- void doImportCheckpoint() throws IOException {
- FSNamesystem fsNamesys = getFSNamesystem();
- FSImage ckptImage = new FSImage(conf, fsNamesys,
+ void doImportCheckpoint(FSNamesystem target) throws IOException {
+ Collection<URI> checkpointDirs =
+ FSImage.getCheckpointDirs(conf, null);
+ Collection<URI> checkpointEditsDirs =
+ FSImage.getCheckpointEditsDirs(conf, null);
+
+ if (checkpointDirs == null || checkpointDirs.isEmpty()) {
+ throw new IOException("Cannot import image from a checkpoint. "
+ + "\"dfs.namenode.checkpoint.dir\" is not set." );
+ }
+
+ if (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()) {
+ throw new IOException("Cannot import image from a checkpoint. "
+ + "\"dfs.namenode.checkpoint.dir\" is not set." );
+ }
+
+ FSImage realImage = target.getFSImage();
+ FSImage ckptImage = new FSImage(conf,
checkpointDirs, checkpointEditsDirs);
- // replace real image with the checkpoint image
- FSImage realImage = fsNamesys.getFSImage();
- assert realImage == this;
- fsNamesys.dir.fsImage = ckptImage;
+ target.dir.fsImage = ckptImage;
// load from the checkpoint dirs
try {
- ckptImage.recoverTransitionRead(StartupOption.REGULAR);
+ ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
} finally {
ckptImage.close();
}
@@ -524,10 +495,11 @@ public class FSImage implements Closeabl
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
- fsNamesys.dir.fsImage = realImage;
+ target.dir.fsImage = realImage;
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
+
// and save it but keep the same checkpointTime
- saveNamespace();
+ saveNamespace(target);
getStorage().writeAll();
}
@@ -558,11 +530,11 @@ public class FSImage implements Closeabl
* Toss the current image and namesystem, reloading from the specified
* file.
*/
- void reloadFromImageFile(File file) throws IOException {
- namesystem.dir.reset();
+ void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
+ target.dir.reset();
LOG.debug("Reloading namespace from " + file);
- loadFSImage(file);
+ loadFSImage(file, target);
}
/**
@@ -580,36 +552,42 @@ public class FSImage implements Closeabl
* @return whether the image should be saved
* @throws IOException
*/
- boolean loadFSImage() throws IOException {
+ boolean loadFSImage(FSNamesystem target) throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs();
isUpgradeFinalized = inspector.isUpgradeFinalized();
-
+
+ FSImageStorageInspector.FSImageFile imageFile
+ = inspector.getLatestImage();
boolean needToSave = inspector.needToSave();
-
- // Plan our load. This will throw if it's impossible to load from the
- // data that's available.
- LoadPlan loadPlan = inspector.createLoadPlan();
- LOG.debug("Planning to load image using following plan:\n" + loadPlan);
-
-
- // Recover from previous interrupted checkpoint, if any
- needToSave |= loadPlan.doRecovery();
-
- //
- // Load in bits
- //
- StorageDirectory sdForProperties =
- loadPlan.getStorageDirectoryForProperties();
- storage.readProperties(sdForProperties);
- File imageFile = loadPlan.getImageFile();
+ Iterable<EditLogInputStream> editStreams = null;
+
+ editLog.recoverUnclosedStreams();
+
+ if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
+ getLayoutVersion())) {
+ editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
+ inspector.getMaxSeenTxId());
+ } else {
+ editStreams = FSImagePreTransactionalStorageInspector
+ .getEditLogStreams(storage);
+ }
+
+ LOG.debug("Planning to load image :\n" + imageFile);
+ for (EditLogInputStream l : editStreams) {
+ LOG.debug("\t Planning to load edit stream: " + l);
+ }
+
try {
+ StorageDirectory sdForProperties = imageFile.sd;
+ storage.readProperties(sdForProperties);
+
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) {
// For txid-based layout, we should have a .md5 file
// next to the image file
- loadFSImage(imageFile);
+ loadFSImage(imageFile.getFile(), target);
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
getLayoutVersion())) {
// In 0.22, we have the checksum stored in the VERSION file.
@@ -621,17 +599,19 @@ public class FSImage implements Closeabl
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
" not set for storage directory " + sdForProperties.getRoot());
}
- loadFSImage(imageFile, new MD5Hash(md5));
+ loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
} else {
// We don't have any record of the md5sum
- loadFSImage(imageFile, null);
+ loadFSImage(imageFile.getFile(), null, target);
}
} catch (IOException ioe) {
- throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
+ FSEditLog.closeAllStreams(editStreams);
+ throw new IOException("Failed to load image from " + imageFile, ioe);
}
- long numLoaded = loadEdits(loadPlan.getEditsFiles());
- needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded);
+ long numLoaded = loadEdits(editStreams, target);
+ needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
+ numLoaded);
// update the txid for the edit log
editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
@@ -663,26 +643,30 @@ public class FSImage implements Closeabl
* Load the specified list of edit files into the image.
* @return the number of transactions loaded
*/
- protected long loadEdits(List<File> editLogs) throws IOException {
- LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editLogs));
+ protected long loadEdits(Iterable<EditLogInputStream> editStreams,
+ FSNamesystem target) throws IOException {
+ LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
long startingTxId = getLastAppliedTxId() + 1;
-
- FSEditLogLoader loader = new FSEditLogLoader(namesystem);
int numLoaded = 0;
- // Load latest edits
- for (File edits : editLogs) {
- LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId);
- EditLogFileInputStream editIn = new EditLogFileInputStream(edits);
- int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
- startingTxId += thisNumLoaded;
- numLoaded += thisNumLoaded;
- lastAppliedTxId += thisNumLoaded;
- editIn.close();
+
+ try {
+ FSEditLogLoader loader = new FSEditLogLoader(target);
+
+ // Load latest edits
+ for (EditLogInputStream editIn : editStreams) {
+ LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
+ int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
+ startingTxId += thisNumLoaded;
+ numLoaded += thisNumLoaded;
+ lastAppliedTxId += thisNumLoaded;
+ }
+ } finally {
+ FSEditLog.closeAllStreams(editStreams);
}
// update the counts
- getFSNamesystem().dir.updateCountForINodeWithQuota();
+ target.dir.updateCountForINodeWithQuota();
return numLoaded;
}
@@ -691,13 +675,14 @@ public class FSImage implements Closeabl
* Load the image namespace from the given image file, verifying
* it against the MD5 sum stored in its associated .md5 file.
*/
- private void loadFSImage(File imageFile) throws IOException {
+ private void loadFSImage(File imageFile, FSNamesystem target)
+ throws IOException {
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
if (expectedMD5 == null) {
throw new IOException("No MD5 file found corresponding to image file "
+ imageFile);
}
- loadFSImage(imageFile, expectedMD5);
+ loadFSImage(imageFile, expectedMD5, target);
}
/**
@@ -705,11 +690,12 @@ public class FSImage implements Closeabl
* filenames and blocks. Return whether we should
* "re-save" and consolidate the edit-logs
*/
- private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
+ private void loadFSImage(File curFile, MD5Hash expectedMd5,
+ FSNamesystem target) throws IOException {
FSImageFormat.Loader loader = new FSImageFormat.Loader(
- conf, getFSNamesystem());
+ conf, target);
loader.load(curFile);
- namesystem.setBlockPoolId(this.getBlockPoolID());
+ target.setBlockPoolId(this.getBlockPoolID());
// Check that the image digest we loaded matches up with what
// we expected
@@ -730,13 +716,14 @@ public class FSImage implements Closeabl
/**
* Save the contents of the FS image to the file.
*/
- void saveFSImage(StorageDirectory sd, long txid) throws IOException {
+ void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
+ throws IOException {
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
FSImageFormat.Saver saver = new FSImageFormat.Saver();
FSImageCompression compression = FSImageCompression.createCompression(conf);
- saver.save(newFile, txid, getFSNamesystem(), compression);
+ saver.save(newFile, txid, source, compression);
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointInfo(txid, Util.now());
@@ -757,8 +744,11 @@ public class FSImage implements Closeabl
private StorageDirectory sd;
private List<StorageDirectory> errorSDs;
private final long txid;
+ private final FSNamesystem source;
- FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs, long txid) {
+ FSImageSaver(FSNamesystem source, StorageDirectory sd,
+ List<StorageDirectory> errorSDs, long txid) {
+ this.source = source;
this.sd = sd;
this.errorSDs = errorSDs;
this.txid = txid;
@@ -766,7 +756,7 @@ public class FSImage implements Closeabl
public void run() {
try {
- saveFSImage(sd, txid);
+ saveFSImage(source, sd, txid);
} catch (Throwable t) {
LOG.error("Unable to save image for " + sd.getRoot(), t);
errorSDs.add(sd);
@@ -795,7 +785,7 @@ public class FSImage implements Closeabl
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
*/
- void saveNamespace() throws IOException {
+ void saveNamespace(FSNamesystem source) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@@ -806,7 +796,7 @@ public class FSImage implements Closeabl
}
long imageTxId = editLog.getLastWrittenTxId();
try {
- saveFSImageInAllDirs(imageTxId);
+ saveFSImageInAllDirs(source, imageTxId);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@@ -818,7 +808,8 @@ public class FSImage implements Closeabl
}
- protected void saveFSImageInAllDirs(long txid) throws IOException {
+ protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
+ throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
@@ -831,7 +822,7 @@ public class FSImage implements Closeabl
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
- FSImageSaver saver = new FSImageSaver(sd, errorSDs, txid);
+ FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Sun Feb 26 23:32:06 2012
@@ -556,8 +556,13 @@ class FSImageFormat {
DataOutputStream out = new DataOutputStream(fos);
try {
out.writeInt(HdfsConstants.LAYOUT_VERSION);
- out.writeInt(sourceNamesystem.getFSImage()
- .getStorage().getNamespaceID()); // TODO bad dependency
+ // We use the non-locked version of getNamespaceInfo here since
+ // the coordinating thread of saveNamespace already has read-locked
+ // the namespace for us. If we attempt to take another readlock
+ // from the actual saver thread, there's a potential of a
+ // fairness-related deadlock. See the comments on HDFS-2223.
+ out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
+ .getNamespaceID());
out.writeLong(fsDir.rootDir.numItemsInTree());
out.writeLong(sourceNamesystem.getGenerationStamp());
out.writeLong(txid);
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java Sun Feb 26 23:32:06 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -55,6 +56,7 @@ class FSImagePreTransactionalStorageInsp
private boolean hasOutOfDateStorageDirs = false;
/* Flag set false if there are any "previous" directories found */
private boolean isUpgradeFinalized = true;
+ private boolean needToSaveAfterRecovery = false;
// Track the name and edits dir with the latest times
private long latestNameCheckpointTime = Long.MIN_VALUE;
@@ -139,15 +141,15 @@ class FSImagePreTransactionalStorageInsp
boolean isUpgradeFinalized() {
return isUpgradeFinalized;
}
-
+
@Override
- LoadPlan createLoadPlan() throws IOException {
+ FSImageFile getLatestImage() throws IOException {
// We should have at least one image and one edits dirs
if (latestNameSD == null)
throw new IOException("Image file is not found in " + imageDirs);
if (latestEditsSD == null)
throw new IOException("Edits file is not found in " + editsDirs);
-
+
// Make sure we are loading image and edits from same checkpoint
if (latestNameCheckpointTime > latestEditsCheckpointTime
&& latestNameSD != latestEditsSD
@@ -168,92 +170,70 @@ class FSImagePreTransactionalStorageInsp
"image checkpoint time = " + latestNameCheckpointTime +
"edits checkpoint time = " + latestEditsCheckpointTime);
}
+
+ needToSaveAfterRecovery = doRecovery();
- return new PreTransactionalLoadPlan();
+ return new FSImageFile(latestNameSD,
+ NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE),
+ HdfsConstants.INVALID_TXID);
}
-
+
@Override
boolean needToSave() {
return hasOutOfDateStorageDirs ||
checkpointTimes.size() != 1 ||
- latestNameCheckpointTime > latestEditsCheckpointTime;
-
+ latestNameCheckpointTime > latestEditsCheckpointTime ||
+ needToSaveAfterRecovery;
}
- private class PreTransactionalLoadPlan extends LoadPlan {
-
- @Override
- boolean doRecovery() throws IOException {
- LOG.debug(
+ boolean doRecovery() throws IOException {
+ LOG.debug(
"Performing recovery in "+ latestNameSD + " and " + latestEditsSD);
- boolean needToSave = false;
- File curFile =
- NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
- File ckptFile =
- NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
-
- //
- // If we were in the midst of a checkpoint
- //
- if (ckptFile.exists()) {
- needToSave = true;
- if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
- .exists()) {
- //
- // checkpointing migth have uploaded a new
- // merged image, but we discard it here because we are
- // not sure whether the entire merged image was uploaded
- // before the namenode crashed.
- //
- if (!ckptFile.delete()) {
- throw new IOException("Unable to delete " + ckptFile);
- }
- } else {
- //
- // checkpointing was in progress when the namenode
- // shutdown. The fsimage.ckpt was created and the edits.new
- // file was moved to edits. We complete that checkpoint by
- // moving fsimage.new to fsimage. There is no need to
- // update the fstime file here. renameTo fails on Windows
- // if the destination file already exists.
- //
+ boolean needToSave = false;
+ File curFile =
+ NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
+ File ckptFile =
+ NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
+
+ //
+ // If we were in the midst of a checkpoint
+ //
+ if (ckptFile.exists()) {
+ needToSave = true;
+ if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
+ .exists()) {
+ //
+ // checkpointing migth have uploaded a new
+ // merged image, but we discard it here because we are
+ // not sure whether the entire merged image was uploaded
+ // before the namenode crashed.
+ //
+ if (!ckptFile.delete()) {
+ throw new IOException("Unable to delete " + ckptFile);
+ }
+ } else {
+ //
+ // checkpointing was in progress when the namenode
+ // shutdown. The fsimage.ckpt was created and the edits.new
+ // file was moved to edits. We complete that checkpoint by
+ // moving fsimage.new to fsimage. There is no need to
+ // update the fstime file here. renameTo fails on Windows
+ // if the destination file already exists.
+ //
+ if (!ckptFile.renameTo(curFile)) {
+ if (!curFile.delete())
+ LOG.warn("Unable to delete dir " + curFile + " before rename");
if (!ckptFile.renameTo(curFile)) {
- if (!curFile.delete())
- LOG.warn("Unable to delete dir " + curFile + " before rename");
- if (!ckptFile.renameTo(curFile)) {
- throw new IOException("Unable to rename " + ckptFile +
- " to " + curFile);
- }
+ throw new IOException("Unable to rename " + ckptFile +
+ " to " + curFile);
}
}
}
- return needToSave;
- }
-
- @Override
- File getImageFile() {
- return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
}
-
- @Override
- List<File> getEditsFiles() {
- if (latestNameCheckpointTime > latestEditsCheckpointTime) {
- // the image is already current, discard edits
- LOG.debug(
- "Name checkpoint time is newer than edits, not loading edits.");
- return Collections.<File>emptyList();
- }
-
- return getEditsInStorageDir(latestEditsSD);
- }
-
- @Override
- StorageDirectory getStorageDirectoryForProperties() {
- return latestNameSD;
- }
+ return needToSave;
}
-
+
/**
* @return a list with the paths to EDITS and EDITS_NEW (if it exists)
* in a given storage directory.
@@ -269,4 +249,33 @@ class FSImagePreTransactionalStorageInsp
}
return files;
}
+
+ private List<File> getLatestEditsFiles() {
+ if (latestNameCheckpointTime > latestEditsCheckpointTime) {
+ // the image is already current, discard edits
+ LOG.debug(
+ "Name checkpoint time is newer than edits, not loading edits.");
+ return Collections.<File>emptyList();
+ }
+
+ return getEditsInStorageDir(latestEditsSD);
+ }
+
+ @Override
+ long getMaxSeenTxId() {
+ return 0L;
+ }
+
+ static Iterable<EditLogInputStream> getEditLogStreams(NNStorage storage)
+ throws IOException {
+ FSImagePreTransactionalStorageInspector inspector
+ = new FSImagePreTransactionalStorageInspector();
+ storage.inspectStorageDirs(inspector);
+
+ List<EditLogInputStream> editStreams = new ArrayList<EditLogInputStream>();
+ for (File f : inspector.getLatestEditsFiles()) {
+ editStreams.add(new EditLogFileInputStream(f));
+ }
+ return editStreams;
+ }
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java Sun Feb 26 23:32:06 2012
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -43,11 +44,16 @@ abstract class FSImageStorageInspector {
abstract boolean isUpgradeFinalized();
/**
- * Create a plan to load the image from the set of inspected storage directories.
+ * Get the image files which should be loaded into the filesystem.
* @throws IOException if not enough files are available (eg no image found in any directory)
*/
- abstract LoadPlan createLoadPlan() throws IOException;
-
+ abstract FSImageFile getLatestImage() throws IOException;
+
+ /**
+ * Get the minimum tx id which should be loaded with this set of images.
+ */
+ abstract long getMaxSeenTxId();
+
/**
* @return true if the directories are in such a state that the image should be re-saved
* following the load
@@ -55,49 +61,6 @@ abstract class FSImageStorageInspector {
abstract boolean needToSave();
/**
- * A plan to load the namespace from disk, providing the locations from which to load
- * the image and a set of edits files.
- */
- abstract static class LoadPlan {
- /**
- * Execute atomic move sequence in the chosen storage directories,
- * in order to recover from an interrupted checkpoint.
- * @return true if some recovery action was taken
- */
- abstract boolean doRecovery() throws IOException;
-
- /**
- * @return the file from which to load the image data
- */
- abstract File getImageFile();
-
- /**
- * @return a list of flies containing edits to replay
- */
- abstract List<File> getEditsFiles();
-
- /**
- * @return the storage directory containing the VERSION file that should be
- * loaded.
- */
- abstract StorageDirectory getStorageDirectoryForProperties();
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Will load image file: ").append(getImageFile()).append("\n");
- sb.append("Will load edits files:").append("\n");
- for (File f : getEditsFiles()) {
- sb.append(" ").append(f).append("\n");
- }
- sb.append("Will load metadata from: ")
- .append(getStorageDirectoryForProperties())
- .append("\n");
- return sb.toString();
- }
- }
-
- /**
* Record of an image that has been located and had its filename parsed.
*/
static class FSImageFile {
@@ -106,7 +69,8 @@ abstract class FSImageStorageInspector {
private final File file;
FSImageFile(StorageDirectory sd, File file, long txId) {
- assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+ assert txId >= 0 || txId == HdfsConstants.INVALID_TXID
+ : "Invalid txid on " + file +": " + txId;
this.sd = sd;
this.txId = txId;
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Sun Feb 26 23:32:06 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -52,9 +51,7 @@ class FSImageTransactionalStorageInspect
private boolean isUpgradeFinalized = true;
List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
- List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
- SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
- long maxSeenTxId = 0;
+ private long maxSeenTxId = 0;
private static final Pattern IMAGE_REGEX = Pattern.compile(
NameNodeFile.IMAGE.getName() + "_(\\d+)");
@@ -68,6 +65,8 @@ class FSImageTransactionalStorageInspect
return;
}
+ maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+
File currentDir = sd.getCurrentDir();
File filesInStorage[];
try {
@@ -110,34 +109,10 @@ class FSImageTransactionalStorageInspect
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
}
- List<EditLogFile> editLogs
- = FileJournalManager.matchEditLogs(filesInStorage);
- if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
- for (EditLogFile log : editLogs) {
- addEditLog(log);
- }
- } else if (!editLogs.isEmpty()){
- LOG.warn("Found the following edit log file(s) in " + sd +
- " even though it was not configured to store edits:\n" +
- " " + Joiner.on("\n ").join(editLogs));
-
- }
-
// set finalized flag
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
}
- private void addEditLog(EditLogFile foundEditLog) {
- foundEditLogs.add(foundEditLog);
- LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
- if (group == null) {
- group = new LogGroup(foundEditLog.getFirstTxId());
- logGroups.put(foundEditLog.getFirstTxId(), group);
- }
- group.add(foundEditLog);
- }
-
-
@Override
public boolean isUpgradeFinalized() {
return isUpgradeFinalized;
@@ -148,9 +123,13 @@ class FSImageTransactionalStorageInspect
* If there are multiple storage directories which contain equal images
* the storage directory that was inspected first will be preferred.
*
- * Returns null if no images were found.
+ * @throws FileNotFoundException if not images are found.
*/
- FSImageFile getLatestImage() {
+ FSImageFile getLatestImage() throws IOException {
+ if (foundImages.isEmpty()) {
+ throw new FileNotFoundException("No valid image files found");
+ }
+
FSImageFile ret = null;
for (FSImageFile img : foundImages) {
if (ret == null || img.txId > ret.txId) {
@@ -164,349 +143,13 @@ class FSImageTransactionalStorageInspect
return ImmutableList.copyOf(foundImages);
}
- public List<EditLogFile> getEditLogFiles() {
- return ImmutableList.copyOf(foundEditLogs);
- }
-
- @Override
- public LoadPlan createLoadPlan() throws IOException {
- if (foundImages.isEmpty()) {
- throw new FileNotFoundException("No valid image files found");
- }
-
- FSImageFile recoveryImage = getLatestImage();
- LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
-
- return new TransactionalLoadPlan(recoveryImage,
- logPlan);
- }
-
- /**
- * Plan which logs to load in order to bring the namespace up-to-date.
- * Transactions will be considered in the range (sinceTxId, maxTxId]
- *
- * @param sinceTxId the highest txid that is already loaded
- * (eg from the image checkpoint)
- * @param maxStartTxId ignore any log files that start after this txid
- */
- LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
- long expectedTxId = sinceTxId + 1;
-
- List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
-
- SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
- if (logGroups.size() > tailGroups.size()) {
- LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) +
- " groups of logs because they start with a txid less than image " +
- "txid " + sinceTxId);
- }
-
- SortedMap<Long, LogGroup> usefulGroups;
- if (maxStartTxId > sinceTxId) {
- usefulGroups = tailGroups.headMap(maxStartTxId);
- } else {
- usefulGroups = new TreeMap<Long, LogGroup>();
- }
-
- if (usefulGroups.size() > tailGroups.size()) {
- LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) +
- " groups of logs because they start with a txid higher than max " +
- "txid " + sinceTxId);
- }
-
-
- for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
- long logStartTxId = entry.getKey();
- LogGroup logGroup = entry.getValue();
-
- logGroup.planRecovery();
-
- if (expectedTxId != HdfsConstants.INVALID_TXID && logStartTxId != expectedTxId) {
- throw new IOException("Expected next log group would start at txid " +
- expectedTxId + " but starts at txid " + logStartTxId);
- }
-
- // We can pick any of the non-corrupt logs here
- recoveryLogs.add(logGroup.getBestNonCorruptLog());
-
- // If this log group was finalized, we know to expect the next
- // log group to start at the following txid (ie no gaps)
- if (logGroup.hasKnownLastTxId()) {
- expectedTxId = logGroup.getLastTxId() + 1;
- } else {
- // the log group was in-progress so we don't know what ID
- // the next group should start from.
- expectedTxId = HdfsConstants.INVALID_TXID;
- }
- }
-
- long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
- 0 : usefulGroups.lastKey();
- if (maxSeenTxId > sinceTxId &&
- maxSeenTxId > lastLogGroupStartTxId) {
- String msg = "At least one storage directory indicated it has seen a " +
- "log segment starting at txid " + maxSeenTxId;
- if (usefulGroups.isEmpty()) {
- msg += " but there are no logs to load.";
- } else {
- msg += " but the most recent log file found starts with txid " +
- lastLogGroupStartTxId;
- }
- throw new IOException(msg);
- }
-
- return new LogLoadPlan(recoveryLogs,
- Lists.newArrayList(usefulGroups.values()));
-
- }
-
@Override
public boolean needToSave() {
return needToSave;
}
-
- /**
- * A group of logs that all start at the same txid.
- *
- * Handles determining which logs are corrupt and which should be considered
- * candidates for loading.
- */
- static class LogGroup {
- long startTxId;
- List<EditLogFile> logs = new ArrayList<EditLogFile>();;
- private Set<Long> endTxIds = new TreeSet<Long>();
- private boolean hasInProgress = false;
- private boolean hasFinalized = false;
-
- LogGroup(long startTxId) {
- this.startTxId = startTxId;
- }
-
- EditLogFile getBestNonCorruptLog() {
- // First look for non-corrupt finalized logs
- for (EditLogFile log : logs) {
- if (!log.isCorrupt() && !log.isInProgress()) {
- return log;
- }
- }
- // Then look for non-corrupt in-progress logs
- for (EditLogFile log : logs) {
- if (!log.isCorrupt()) {
- return log;
- }
- }
- // We should never get here, because we don't get to the planning stage
- // without calling planRecovery first, and if we've called planRecovery,
- // we would have already thrown if there were no non-corrupt logs!
- throw new IllegalStateException(
- "No non-corrupt logs for txid " + startTxId);
- }
-
- /**
- * @return true if we can determine the last txid in this log group.
- */
- boolean hasKnownLastTxId() {
- for (EditLogFile log : logs) {
- if (!log.isInProgress()) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * @return the last txid included in the logs in this group
- * @throws IllegalStateException if it is unknown -
- * {@see #hasKnownLastTxId()}
- */
- long getLastTxId() {
- for (EditLogFile log : logs) {
- if (!log.isInProgress()) {
- return log.getLastTxId();
- }
- }
- throw new IllegalStateException("LogGroup only has in-progress logs");
- }
-
-
- void add(EditLogFile log) {
- assert log.getFirstTxId() == startTxId;
- logs.add(log);
-
- if (log.isInProgress()) {
- hasInProgress = true;
- } else {
- hasFinalized = true;
- endTxIds.add(log.getLastTxId());
- }
- }
-
- void planRecovery() throws IOException {
- assert hasInProgress || hasFinalized;
-
- checkConsistentEndTxIds();
-
- if (hasFinalized && hasInProgress) {
- planMixedLogRecovery();
- } else if (!hasFinalized && hasInProgress) {
- planAllInProgressRecovery();
- } else if (hasFinalized && !hasInProgress) {
- LOG.debug("No recovery necessary for logs starting at txid " +
- startTxId);
- }
- }
-
- /**
- * Recovery case for when some logs in the group were in-progress, and
- * others were finalized. This happens when one of the storage
- * directories fails.
- *
- * The in-progress logs in this case should be considered corrupt.
- */
- private void planMixedLogRecovery() throws IOException {
- for (EditLogFile log : logs) {
- if (log.isInProgress()) {
- LOG.warn("Log at " + log.getFile() + " is in progress, but " +
- "other logs starting at the same txid " + startTxId +
- " are finalized. Moving aside.");
- log.markCorrupt();
- }
- }
- }
-
- /**
- * Recovery case for when all of the logs in the group were in progress.
- * This happens if the NN completely crashes and restarts. In this case
- * we check the non-zero lengths of each log file, and any logs that are
- * less than the max of these lengths are considered corrupt.
- */
- private void planAllInProgressRecovery() throws IOException {
- // We only have in-progress logs. We need to figure out which logs have
- // the latest data to reccover them
- LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
- "in-progress (probably truncated due to a previous NameNode " +
- "crash)");
- if (logs.size() == 1) {
- // Only one log, it's our only choice!
- EditLogFile log = logs.get(0);
- if (log.validateLog().numTransactions == 0) {
- // If it has no transactions, we should consider it corrupt just
- // to be conservative.
- // See comment below for similar case
- LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
- "it has no transactions in it.");
- log.markCorrupt();
- }
- return;
- }
-
- long maxValidTxnCount = Long.MIN_VALUE;
- for (EditLogFile log : logs) {
- long validTxnCount = log.validateLog().numTransactions;
- LOG.warn(" Log " + log.getFile() +
- " valid txns=" + validTxnCount +
- " valid len=" + log.validateLog().validLength);
- maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
- }
-
- for (EditLogFile log : logs) {
- long txns = log.validateLog().numTransactions;
- if (txns < maxValidTxnCount) {
- LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
- "it is has only " + txns + " valid txns whereas another " +
- "log has " + maxValidTxnCount);
- log.markCorrupt();
- } else if (txns == 0) {
- // this can happen if the NN crashes right after rolling a log
- // but before the START_LOG_SEGMENT txn is written. Since the log
- // is empty, we can just move it aside to its corrupt name.
- LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
- "it has no transactions in it.");
- log.markCorrupt();
- }
- }
- }
-
- /**
- * Check for the case when we have multiple finalized logs and they have
- * different ending transaction IDs. This violates an invariant that all
- * log directories should roll together. We should abort in this case.
- */
- private void checkConsistentEndTxIds() throws IOException {
- if (hasFinalized && endTxIds.size() > 1) {
- throw new IOException("More than one ending txid was found " +
- "for logs starting at txid " + startTxId + ". " +
- "Found: " + StringUtils.join(endTxIds, ','));
- }
- }
-
- void recover() throws IOException {
- for (EditLogFile log : logs) {
- if (log.isCorrupt()) {
- log.moveAsideCorruptFile();
- } else if (log.isInProgress()) {
- log.finalizeLog();
- }
- }
- }
- }
-
- static class TransactionalLoadPlan extends LoadPlan {
- final FSImageFile image;
- final LogLoadPlan logPlan;
-
- public TransactionalLoadPlan(FSImageFile image,
- LogLoadPlan logPlan) {
- super();
- this.image = image;
- this.logPlan = logPlan;
- }
-
- @Override
- boolean doRecovery() throws IOException {
- logPlan.doRecovery();
- return false;
- }
-
- @Override
- File getImageFile() {
- return image.getFile();
- }
-
- @Override
- List<File> getEditsFiles() {
- return logPlan.getEditsFiles();
- }
-
- @Override
- StorageDirectory getStorageDirectoryForProperties() {
- return image.sd;
- }
- }
-
- static class LogLoadPlan {
- final List<EditLogFile> editLogs;
- final List<LogGroup> logGroupsToRecover;
-
- LogLoadPlan(List<EditLogFile> editLogs,
- List<LogGroup> logGroupsToRecover) {
- this.editLogs = editLogs;
- this.logGroupsToRecover = logGroupsToRecover;
- }
-
- public void doRecovery() throws IOException {
- for (LogGroup g : logGroupsToRecover) {
- g.recover();
- }
- }
-
- public List<File> getEditsFiles() {
- List<File> ret = new ArrayList<File>();
- for (EditLogFile log : editLogs) {
- ret.add(log.getFile());
- }
- return ret;
- }
+ @Override
+ long getMaxSeenTxId() {
+ return maxSeenTxId;
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Feb 26 23:32:06 2012
@@ -173,6 +173,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
+import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
/***************************************************
@@ -297,12 +298,43 @@ public class FSNamesystem implements Nam
// lock to protect FSNamesystem.
private ReentrantReadWriteLock fsLock;
+
+ /**
+ * Instantiates an FSNamesystem loaded from the image and edits
+ * directories specified in the passed Configuration.
+ *
+ * @param conf the Configuration which specifies the storage directories
+ * from which to load
+ * @return an FSNamesystem which contains the loaded namespace
+ * @throws IOException if loading fails
+ */
+ public static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
+ FSImage fsImage = new FSImage(conf);
+ FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
+
+ long loadStart = now();
+ StartupOption startOpt = NameNode.getStartupOption(conf);
+ namesystem.loadFSImage(startOpt, fsImage);
+ long timeTakenToLoadFSImage = now() - loadStart;
+ LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
+ NameNode.getNameNodeMetrics().setFsImageLoadTime(
+ (int) timeTakenToLoadFSImage);
+ return namesystem;
+ }
+
/**
- * FSNamesystem constructor.
+ * Create an FSNamesystem associated with the specified image.
+ *
+ * Note that this does not load any data off of disk -- if you would
+ * like that behavior, use {@link #loadFromDisk(Configuration)}
+
+ * @param fnImage The FSImage to associate with
+ * @param conf configuration
+ * @throws IOException on bad configuration
*/
- FSNamesystem(Configuration conf) throws IOException {
+ FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
try {
- initialize(conf, null);
+ initialize(conf, fsImage);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@@ -318,29 +350,41 @@ public class FSNamesystem implements Nam
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
- nnResourceChecker = new NameNodeResourceChecker(conf);
- checkAvailableResources();
this.systemStart = now();
this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
- this.registerMBean(); // register the MBean for the FSNamesystemState
- if(fsImage == null) {
- this.dir = new FSDirectory(this, conf);
- StartupOption startOpt = NameNode.getStartupOption(conf);
- this.dir.loadFSImage(startOpt);
- long timeTakenToLoadFSImage = now() - systemStart;
- LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
- NameNode.getNameNodeMetrics().setFsImageLoadTime(
- (int) timeTakenToLoadFSImage);
- } else {
- this.dir = new FSDirectory(fsImage, this, conf);
- }
+ this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
}
+ void loadFSImage(StartupOption startOpt, FSImage fsImage)
+ throws IOException {
+ // format before starting up if requested
+ if (startOpt == StartupOption.FORMAT) {
+
+ fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
+
+ startOpt = StartupOption.REGULAR;
+ }
+ boolean success = false;
+ try {
+ if (fsImage.recoverTransitionRead(startOpt, this)) {
+ fsImage.saveNamespace(this);
+ }
+ fsImage.openEditLog();
+
+ success = true;
+ } finally {
+ if (!success) {
+ fsImage.close();
+ }
+ }
+ dir.imageLoadComplete();
+ }
+
void activateSecretManager() throws IOException {
if (dtSecretManager != null) {
dtSecretManager.startThreads();
@@ -351,8 +395,13 @@ public class FSNamesystem implements Nam
* Activate FSNamesystem daemons.
*/
void activate(Configuration conf) throws IOException {
+ this.registerMBean(); // register the MBean for the FSNamesystemState
+
writeLock();
try {
+ nnResourceChecker = new NameNodeResourceChecker(conf);
+ checkAvailableResources();
+
setBlockTotal();
blockManager.activate(conf);
@@ -436,37 +485,6 @@ public class FSNamesystem implements Nam
}
/**
- * dirs is a list of directories where the filesystem directory state
- * is stored
- */
- FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
- this.fsLock = new ReentrantReadWriteLock(true);
- this.blockManager = new BlockManager(this, this, conf);
- setConfigurationParameters(conf);
- this.dir = new FSDirectory(fsImage, this, conf);
- dtSecretManager = createDelegationTokenSecretManager(conf);
- }
-
- /**
- * Create FSNamesystem for {@link BackupNode}.
- * Should do everything that would be done for the NameNode,
- * except for loading the image.
- *
- * @param bnImage {@link BackupImage}
- * @param conf configuration
- * @throws IOException
- */
- FSNamesystem(Configuration conf, BackupImage bnImage) throws IOException {
- try {
- initialize(conf, bnImage);
- } catch(IOException e) {
- LOG.error(getClass().getSimpleName() + " initialization failed.", e);
- close();
- throw e;
- }
- }
-
- /**
* Initializes some of the members from configuration
*/
private void setConfigurationParameters(Configuration conf)
@@ -514,16 +532,23 @@ public class FSNamesystem implements Nam
NamespaceInfo getNamespaceInfo() {
readLock();
try {
- return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
- getClusterId(), getBlockPoolId(),
- dir.fsImage.getStorage().getCTime(),
- upgradeManager.getUpgradeVersion());
+ return unprotectedGetNamespaceInfo();
} finally {
readUnlock();
}
}
/**
+ * Version of {@see #getNamespaceInfo()} that is not protected by a lock.
+ */
+ NamespaceInfo unprotectedGetNamespaceInfo() {
+ return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+ getClusterId(), getBlockPoolId(),
+ dir.fsImage.getStorage().getCTime(),
+ upgradeManager.getUpgradeVersion());
+ }
+
+ /**
* Close down this file system manager.
* Causes heartbeat and lease daemons to stop; waits briefly for
* them to finish, but a short timeout returns control back to caller.
@@ -2567,6 +2592,8 @@ public class FSNamesystem implements Nam
* @throws IOException
*/
private void checkAvailableResources() throws IOException {
+ Preconditions.checkState(nnResourceChecker != null,
+ "nnResourceChecker not initialized");
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}
@@ -2752,7 +2779,7 @@ public class FSNamesystem implements Nam
throw new IOException("Safe mode should be turned ON " +
"in order to create namespace image.");
}
- getFSImage().saveNamespace();
+ getFSImage().saveNamespace(this);
LOG.info("New namespace image has been created.");
} finally {
readUnlock();
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Sun Feb 26 23:32:06 2012
@@ -80,7 +80,7 @@ public class FileChecksumServlets {
dtParam + addrParam);
}
- /** {@inheritDoc} */
+ @Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final ServletContext context = getServletContext();
@@ -104,7 +104,7 @@ public class FileChecksumServlets {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
- /** {@inheritDoc} */
+ @Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final PrintWriter out = response.getWriter();
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Sun Feb 26 23:32:06 2012
@@ -23,11 +23,14 @@ import org.apache.commons.logging.LogFac
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.HashMap;
import java.util.Comparator;
+import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
@@ -57,6 +60,9 @@ class FileJournalManager implements Jour
private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+ private File currentInProgress = null;
+ private long maxSeenTransaction = 0L;
+
@VisibleForTesting
StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
@@ -66,19 +72,20 @@ class FileJournalManager implements Jour
}
@Override
- public EditLogOutputStream startLogSegment(long txid) throws IOException {
- File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
- EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
+ synchronized public EditLogOutputStream startLogSegment(long txid)
+ throws IOException {
+ currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+ EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
outputBufferCapacity);
stm.create();
return stm;
}
@Override
- public void finalizeLogSegment(long firstTxId, long lastTxId)
+ synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
- File inprogressFile = NNStorage.getInProgressEditsFile(
- sd, firstTxId);
+ File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
+
File dstFile = NNStorage.getFinalizedEditsFile(
sd, firstTxId, lastTxId);
LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
@@ -89,6 +96,9 @@ class FileJournalManager implements Jour
if (!inprogressFile.renameTo(dstFile)) {
throw new IOException("Unable to finalize edits file " + inprogressFile);
}
+ if (inprogressFile.equals(currentInProgress)) {
+ currentInProgress = null;
+ }
}
@VisibleForTesting
@@ -97,12 +107,7 @@ class FileJournalManager implements Jour
}
@Override
- public String toString() {
- return "FileJournalManager for storage directory " + sd;
- }
-
- @Override
- public void setOutputBufferCapacity(int size) {
+ synchronized public void setOutputBufferCapacity(int size) {
this.outputBufferCapacity = size;
}
@@ -120,13 +125,6 @@ class FileJournalManager implements Jour
}
}
- @Override
- public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
- throws IOException {
- File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
- return new EditLogFileInputStream(f);
- }
-
/**
* Find all editlog segments starting at or above the given txid.
* @param fromTxId the txnid which to start looking
@@ -178,17 +176,156 @@ class FileJournalManager implements Jour
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
- new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END));
+ new EditLogFile(f, startTxId, startTxId, true));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
// skip
- }
+ }
}
}
return ret;
}
+ @Override
+ synchronized public EditLogInputStream getInputStream(long fromTxId)
+ throws IOException {
+ for (EditLogFile elf : getLogFiles(fromTxId)) {
+ if (elf.getFirstTxId() == fromTxId) {
+ if (elf.isInProgress()) {
+ elf.validateLog();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Returning edit stream reading from " + elf);
+ }
+ return new EditLogFileInputStream(elf.getFile(),
+ elf.getFirstTxId(), elf.getLastTxId());
+ }
+ }
+
+ throw new IOException("Cannot find editlog file with " + fromTxId
+ + " as first first txid");
+ }
+
+ @Override
+ public long getNumberOfTransactions(long fromTxId)
+ throws IOException, CorruptionException {
+ long numTxns = 0L;
+
+ for (EditLogFile elf : getLogFiles(fromTxId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Counting " + elf);
+ }
+ if (elf.getFirstTxId() > fromTxId) { // there must be a gap
+ LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+ + fromTxId + " - " + (elf.getFirstTxId() - 1));
+ break;
+ } else if (fromTxId == elf.getFirstTxId()) {
+ if (elf.isInProgress()) {
+ elf.validateLog();
+ }
+
+ if (elf.isCorrupt()) {
+ break;
+ }
+ fromTxId = elf.getLastTxId() + 1;
+ numTxns += fromTxId - elf.getFirstTxId();
+
+ if (elf.isInProgress()) {
+ break;
+ }
+ } // else skip
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Journal " + this + " has " + numTxns
+ + " txns from " + fromTxId);
+ }
+
+ long max = findMaxTransaction();
+ // fromTxId should be greater than max, as it points to the next
+ // transaction we should expect to find. If it is less than or equal
+ // to max, it means that a transaction with txid == max has not been found
+ if (numTxns == 0 && fromTxId <= max) {
+ String error = String.format("Gap in transactions, max txnid is %d"
+ + ", 0 txns from %d", max, fromTxId);
+ LOG.error(error);
+ throw new CorruptionException(error);
+ }
+
+ return numTxns;
+ }
+
+ @Override
+ synchronized public void recoverUnfinalizedSegments() throws IOException {
+ File currentDir = sd.getCurrentDir();
+ List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+
+ // make sure journal is aware of max seen transaction before moving corrupt
+ // files aside
+ findMaxTransaction();
+
+ for (EditLogFile elf : allLogFiles) {
+ if (elf.getFile().equals(currentInProgress)) {
+ continue;
+ }
+ if (elf.isInProgress()) {
+ elf.validateLog();
+
+ if (elf.isCorrupt()) {
+ elf.moveAsideCorruptFile();
+ continue;
+ }
+ finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
+ }
+ }
+ }
+
+ private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+ File currentDir = sd.getCurrentDir();
+ List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+ List<EditLogFile> logFiles = Lists.newArrayList();
+
+ for (EditLogFile elf : allLogFiles) {
+ if (fromTxId > elf.getFirstTxId()
+ && fromTxId <= elf.getLastTxId()) {
+ throw new IOException("Asked for fromTxId " + fromTxId
+ + " which is in middle of file " + elf.file);
+ }
+ if (fromTxId <= elf.getFirstTxId()) {
+ logFiles.add(elf);
+ }
+ }
+
+ Collections.sort(logFiles, EditLogFile.COMPARE_BY_START_TXID);
+
+ return logFiles;
+ }
+
+ /**
+ * Find the maximum transaction in the journal.
+ * This gets stored in a member variable, as corrupt edit logs
+ * will be moved aside, but we still need to remember their first
+ * tranaction id in the case that it was the maximum transaction in
+ * the journal.
+ */
+ private long findMaxTransaction()
+ throws IOException {
+ for (EditLogFile elf : getLogFiles(0)) {
+ if (elf.isInProgress()) {
+ maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
+ elf.validateLog();
+ }
+ maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
+ }
+ return maxSeenTransaction;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("FileJournalManager(root=%s)", sd.getRoot());
+ }
+
/**
* Record of an edit log that has been located and had its filename parsed.
*/
@@ -196,12 +333,10 @@ class FileJournalManager implements Jour
private File file;
private final long firstTxId;
private long lastTxId;
-
- private EditLogValidation cachedValidation = null;
+
private boolean isCorrupt = false;
-
- static final long UNKNOWN_END = -1;
-
+ private final boolean isInProgress;
+
final static Comparator<EditLogFile> COMPARE_BY_START_TXID
= new Comparator<EditLogFile>() {
public int compare(EditLogFile a, EditLogFile b) {
@@ -214,30 +349,24 @@ class FileJournalManager implements Jour
EditLogFile(File file,
long firstTxId, long lastTxId) {
- assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId;
- assert firstTxId > 0;
+ this(file, firstTxId, lastTxId, false);
+ assert (lastTxId != HdfsConstants.INVALID_TXID)
+ && (lastTxId >= firstTxId);
+ }
+
+ EditLogFile(File file, long firstTxId,
+ long lastTxId, boolean isInProgress) {
+ assert (lastTxId == HdfsConstants.INVALID_TXID && isInProgress)
+ || (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId);
+ assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
assert file != null;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
+ this.isInProgress = isInProgress;
}
- public void finalizeLog() throws IOException {
- long numTransactions = validateLog().numTransactions;
- long lastTxId = firstTxId + numTransactions - 1;
- File dst = new File(file.getParentFile(),
- NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId));
- LOG.info("Finalizing edits log " + file + " by renaming to "
- + dst.getName());
- if (!file.renameTo(dst)) {
- throw new IOException("Couldn't finalize log " +
- file + " to " + dst);
- }
- this.lastTxId = lastTxId;
- file = dst;
- }
-
long getFirstTxId() {
return firstTxId;
}
@@ -246,15 +375,22 @@ class FileJournalManager implements Jour
return lastTxId;
}
- EditLogValidation validateLog() throws IOException {
- if (cachedValidation == null) {
- cachedValidation = EditLogFileInputStream.validateEditLog(file);
+ /**
+ * Count the number of valid transactions in a log.
+ * This will update the lastTxId of the EditLogFile or
+ * mark it as corrupt if it is.
+ */
+ void validateLog() throws IOException {
+ EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
+ if (val.getNumTransactions() == 0) {
+ markCorrupt();
+ } else {
+ this.lastTxId = val.getEndTxId();
}
- return cachedValidation;
}
boolean isInProgress() {
- return (lastTxId == UNKNOWN_END);
+ return isInProgress;
}
File getFile() {
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Sun Feb 26 23:32:06 2012
@@ -261,13 +261,13 @@ public abstract class INode implements C
this.name = name;
}
- /** {@inheritDoc} */
+ @Override
public String getFullPathName() {
// Get the full path name of this inode.
return FSDirectory.getFullPathName(this);
}
- /** {@inheritDoc} */
+ @Override
public String toString() {
return "\"" + getFullPathName() + "\":"
+ getUserName() + ":" + getGroupName() + ":"
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Sun Feb 26 23:32:06 2012
@@ -372,7 +372,7 @@ class INodeDirectory extends INode {
return parent;
}
- /** {@inheritDoc} */
+ @Override
DirCounts spaceConsumedInTree(DirCounts counts) {
counts.nsCount += 1;
if (children != null) {
@@ -383,7 +383,7 @@ class INodeDirectory extends INode {
return counts;
}
- /** {@inheritDoc} */
+ @Override
long[] computeContentSummary(long[] summary) {
// Walk through the children of this node, using a new summary array
// for the (sub)tree rooted at this node
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sun Feb 26 23:32:06 2012
@@ -168,7 +168,7 @@ public class INodeFile extends INode {
return 1;
}
- /** {@inheritDoc} */
+ @Override
long[] computeContentSummary(long[] summary) {
summary[0] += computeFileSize(true);
summary[1]++;