You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2008/08/30 01:13:13 UTC
svn commit: r690418 [1/2] - in /hadoop/core/trunk: ./ conf/
src/hdfs/org/apache/hadoop/hdfs/server/common/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/ src/test/org/a...
Author: shv
Date: Fri Aug 29 16:13:12 2008
New Revision: 690418
URL: http://svn.apache.org/viewvc?rev=690418&view=rev
Log:
HADOOP-3948. Separate name-node edits and fsimage directories. Contributed by Lohit Vijayarenu.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 29 16:13:12 2008
@@ -111,6 +111,9 @@
HADOOP-3828. Provides a way to write skipped records to DFS.
(Sharad Agarwal via ddas)
+ HADOOP-3948. Separate name-node edits and fsimage directories.
+ (Lohit Vijayarenu via shv)
+
IMPROVEMENTS
HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
@@ -626,7 +629,7 @@
(rangadi)
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
- (Lohit Vjayarenu via rangadi)
+ (Lohit Vijayarenu via rangadi)
HADOOP-3130. Make the connect timeout smaller for getFile.
(Amar Ramesh Kamat via ddas)
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Aug 29 16:13:12 2008
@@ -242,13 +242,24 @@
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
- name node should store the temporary images and edits to merge.
+ name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
replicated in all of the directories for redundancy.
</description>
</property>
<property>
+ <name>fs.checkpoint.edits.dir</name>
+ <value>${fs.checkpoint.dir}</value>
+ <description>Determines where on the local filesystem the DFS secondary
+ name node should store the temporary edits to merge.
+ If this is a comma-delimited list of directoires then teh edits is
+ replicated in all of the directoires for redundancy.
+ Default value is same as fs.checkpoint.dir
+ </description>
+</property>
+
+<property>
<name>fs.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
@@ -380,12 +391,21 @@
<name>dfs.name.dir</name>
<value>${hadoop.tmp.dir}/dfs/name</value>
<description>Determines where on the local filesystem the DFS name node
- should store the name table. If this is a comma-delimited list
+ should store the name table(fsimage). If this is a comma-delimited list
of directories then the name table is replicated in all of the
directories, for redundancy. </description>
</property>
<property>
+ <name>dfs.name.edits.dir</name>
+ <value>${dfs.name.dir}</value>
+ <description>Determines where on the local filesystem the DFS name node
+ should store the transaction (edits) file. If this is a comma-delimited list
+ of directories then the transaction file is replicated in all of the
+ directories, for redundancy. Default value is same as dfs.name.dir
+ </description>
+</property>
+<property>
<name>dfs.web.ugi</name>
<value>webuser,webgroup</value>
<description>The user account used by the web interface.
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java Fri Aug 29 16:13:12 2008
@@ -97,22 +97,117 @@
NORMAL;
}
+ /**
+ * An interface to denote storage directory type
+ * Implementations can define a type for storage directory by implementing
+ * this interface.
+ */
+ public interface StorageDirType {
+ public StorageDirType getStorageDirType();
+ public boolean isOfType(StorageDirType type);
+ }
+
private NodeType storageType; // Type of the node using this storage
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
+ private class DirIterator implements Iterator<StorageDirectory> {
+ StorageDirType dirType;
+ int prevIndex; // for remove()
+ int nextIndex; // for next()
+
+ DirIterator(StorageDirType dirType) {
+ this.dirType = dirType;
+ this.nextIndex = 0;
+ this.prevIndex = 0;
+ }
+
+ public boolean hasNext() {
+ if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
+ return false;
+ if (dirType != null) {
+ while (nextIndex < storageDirs.size()) {
+ if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+ break;
+ nextIndex++;
+ }
+ if (nextIndex >= storageDirs.size())
+ return false;
+ }
+ return true;
+ }
+
+ public StorageDirectory next() {
+ StorageDirectory sd = getStorageDir(nextIndex);
+ prevIndex = nextIndex;
+ nextIndex++;
+ if (dirType != null) {
+ while (nextIndex < storageDirs.size()) {
+ if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+ break;
+ nextIndex++;
+ }
+ }
+ return sd;
+ }
+
+ public void remove() {
+ nextIndex = prevIndex; // restore previous state
+ storageDirs.remove(prevIndex); // remove last returned element
+ hasNext(); // reset nextIndex to correct place
+ }
+ }
+
+ /**
+ * Return default iterator
+ * This iterator returns all entires of storageDirs
+ */
+ public Iterator<StorageDirectory> dirIterator() {
+ return dirIterator(null);
+ }
+
+ /**
+ * Return iterator based on Storage Directory Type
+ * This iterator selects entires of storageDirs of type dirType and returns
+ * them via the Iterator
+ */
+ public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
+ return new DirIterator(dirType);
+ }
+
/**
* One of the storage directories.
*/
public class StorageDirectory {
- public File root; // root directory
+ File root; // root directory
FileLock lock; // storage lock
+ StorageDirType dirType; // storage dir type
public StorageDirectory(File dir) {
+ // default dirType is null
+ this(dir, null);
+ }
+
+ public StorageDirectory(File dir, StorageDirType dirType) {
this.root = dir;
this.lock = null;
+ this.dirType = dirType;
+ }
+
+ /**
+ * Get root directory of this storage
+ */
+ public File getRoot() {
+ return root;
}
/**
+ * Get storage directory type
+ */
+ public StorageDirType getStorageDirType() {
+ return dirType;
+ }
+
+ /**
* Read version file.
*
* @throws IOException if file cannot be read or contains inconsistent data
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Fri Aug 29 16:13:12 2008
@@ -179,14 +179,14 @@
if (ssid == null ||
!("".equals(storageID) || "".equals(ssid) ||
storageID.equals(ssid)))
- throw new InconsistentFSStateException(sd.root,
+ throw new InconsistentFSStateException(sd.getRoot(),
"has incompatible storage Id.");
if ("".equals(storageID)) // update id only if it was empty
storageID = ssid;
}
public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
- File oldF = new File(sd.root, "storage");
+ File oldF = new File(sd.getRoot(), "storage");
if (!oldF.exists())
return false;
// check the layout version inside the storage file
@@ -230,7 +230,7 @@
"Future version is not allowed";
if (getNamespaceID() != nsInfo.getNamespaceID())
throw new IOException(
- "Incompatible namespaceIDs in " + sd.root.getCanonicalPath()
+ "Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
+ ": namenode namespaceID = " + nsInfo.getNamespaceID()
+ "; datanode namespaceID = " + getNamespaceID());
if (this.layoutVersion == FSConstants.LAYOUT_VERSION
@@ -262,7 +262,7 @@
void doUpgrade(StorageDirectory sd,
NamespaceInfo nsInfo
) throws IOException {
- LOG.info("Upgrading storage directory " + sd.root
+ LOG.info("Upgrading storage directory " + sd.getRoot()
+ ".\n old LV = " + this.getLayoutVersion()
+ "; old CTime = " + this.getCTime()
+ ".\n new LV = " + nsInfo.getLayoutVersion()
@@ -287,7 +287,7 @@
sd.write();
// rename tmp to previous
rename(tmpDir, prevDir);
- LOG.info("Upgrade of " + sd.root + " is complete.");
+ LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
}
void doRollback( StorageDirectory sd,
@@ -298,19 +298,19 @@
if (!prevDir.exists())
return;
DataStorage prevInfo = new DataStorage();
- StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.root);
+ StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
prevSD.read(prevSD.getPreviousVersionFile());
// We allow rollback to a state, which is either consistent with
// the namespace state or can be further upgraded to it.
if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
&& prevInfo.getCTime() <= nsInfo.getCTime())) // cannot rollback
- throw new InconsistentFSStateException(prevSD.root,
+ throw new InconsistentFSStateException(prevSD.getRoot(),
"Cannot rollback to a newer state.\nDatanode previous state: LV = "
+ prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
+ " is newer than the namespace state: LV = "
+ nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
- LOG.info("Rolling back storage directory " + sd.root
+ LOG.info("Rolling back storage directory " + sd.getRoot()
+ ".\n target LV = " + nsInfo.getLayoutVersion()
+ "; target CTime = " + nsInfo.getCTime());
File tmpDir = sd.getRemovedTmp();
@@ -323,14 +323,14 @@
rename(prevDir, curDir);
// delete tmp dir
deleteDir(tmpDir);
- LOG.info("Rollback of " + sd.root + " is complete.");
+ LOG.info("Rollback of " + sd.getRoot() + " is complete.");
}
void doFinalize(StorageDirectory sd) throws IOException {
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
return; // already discarded
- final String dataDirPath = sd.root.getCanonicalPath();
+ final String dataDirPath = sd.getRoot().getCanonicalPath();
LOG.info("Finalizing upgrade for storage directory "
+ dataDirPath
+ ".\n cur LV = " + this.getLayoutVersion()
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Aug 29 16:13:12 2008
@@ -54,7 +54,8 @@
/** Access an existing dfs name directory. */
public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
this(new FSImage(), ns, conf);
- fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null));
+ fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
+ FSImage.getCheckpointEditsDirs(conf, null));
}
public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
@@ -73,22 +74,23 @@
}
void loadFSImage(Collection<File> dataDirs,
+ Collection<File> editsDirs,
StartupOption startOpt) throws IOException {
// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
- fsImage.setStorageDirectories(dataDirs);
+ fsImage.setStorageDirectories(dataDirs, editsDirs);
fsImage.format();
startOpt = StartupOption.REGULAR;
}
try {
- if (fsImage.recoverTransitionRead(dataDirs, startOpt)) {
+ if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
fsImage.saveFSImage();
}
FSEditLog editLog = fsImage.getEditLog();
assert editLog != null : "editLog must be initialized";
if (!editLog.isOpen())
editLog.open();
- fsImage.setCheckpointDirectories(null);
+ fsImage.setCheckpointDirectories(null, null);
} catch(IOException e) {
fsImage.close();
throw e;
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Aug 29 16:13:12 2008
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
+import java.util.Iterator;
import java.lang.Math;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
@@ -36,6 +37,8 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.permission.*;
@@ -231,6 +234,13 @@
" at offset " + newsize);
}
}
+
+ /**
+ * Returns the file associated with this stream
+ */
+ File getFile() {
+ return file;
+ }
}
static class EditLogFileInputStream extends EditLogInputStream {
@@ -275,17 +285,21 @@
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = FSNamesystem.now();
}
-
- private File getEditFile(int idx) {
- return fsimage.getEditFile(idx);
+
+ private File getEditFile(StorageDirectory sd) {
+ return fsimage.getEditFile(sd);
}
-
- private File getEditNewFile(int idx) {
- return fsimage.getEditNewFile(idx);
+
+ private File getEditNewFile(StorageDirectory sd) {
+ return fsimage.getEditNewFile(sd);
}
private int getNumStorageDirs() {
- return fsimage.getNumStorageDirs();
+ int numStorageDirs = 0;
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext(); it.next())
+ numStorageDirs++;
+ return numStorageDirs;
}
synchronized int getNumEditStreams() {
@@ -304,18 +318,19 @@
*/
public synchronized void open() throws IOException {
numTransactions = totalTimeTransactions = 0;
- int size = getNumStorageDirs();
if (editStreams == null)
- editStreams = new ArrayList<EditLogOutputStream>(size);
- for (int idx = 0; idx < size; idx++) {
- File eFile = getEditFile(idx);
+ editStreams = new ArrayList<EditLogOutputStream>();
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ File eFile = getEditFile(sd);
try {
EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
editStreams.add(eStream);
} catch (IOException e) {
FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
- fsimage.processIOError(idx);
- idx--;
+ // Remove the directory from list of storage directories
+ it.remove();
}
}
}
@@ -330,8 +345,9 @@
* Create edits.new if non existent.
*/
synchronized void createNewIfMissing() throws IOException {
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- File newFile = getEditNewFile(idx);
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ File newFile = getEditNewFile(it.next());
if (!newFile.exists())
createEditLogFile(newFile);
}
@@ -380,14 +396,39 @@
}
assert(index < getNumStorageDirs());
assert(getNumStorageDirs() == editStreams.size());
-
+
+ File parentStorageDir = ((EditLogFileOutputStream)editStreams
+ .get(index)).getFile()
+ .getParentFile().getParentFile();
editStreams.remove(index);
//
// Invoke the ioerror routine of the fsimage
//
- fsimage.processIOError(index);
+ fsimage.processIOError(parentStorageDir);
}
-
+
+ /**
+ * If there is an IO Error on any log operations on storage directory,
+ * remove any stream associated with that directory
+ */
+ synchronized void processIOError(StorageDirectory sd) {
+ // Try to remove stream only if one should exist
+ if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+ return;
+ if (editStreams == null || editStreams.size() <= 1) {
+ FSNamesystem.LOG.fatal(
+ "Fatal Error : All storage directories are inaccessible.");
+ Runtime.getRuntime().exit(-1);
+ }
+ for (int idx = 0; idx < editStreams.size(); idx++) {
+ File parentStorageDir = ((EditLogFileOutputStream)editStreams
+ .get(idx)).getFile()
+ .getParentFile().getParentFile();
+ if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+ editStreams.remove(idx);
+ }
+ }
+
/**
* The specified streams have IO errors. Remove them from logging
* new transactions.
@@ -412,20 +453,16 @@
}
processIOError(j);
}
- int failedStreamIdx = 0;
- while(failedStreamIdx >= 0) {
- failedStreamIdx = fsimage.incrementCheckpointTime();
- if(failedStreamIdx >= 0)
- processIOError(failedStreamIdx);
- }
+ fsimage.incrementCheckpointTime();
}
/**
* check if ANY edits.new log exists
*/
boolean existsNew() throws IOException {
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- if (getEditNewFile(idx).exists()) {
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ if (getEditNewFile(it.next()).exists()) {
return true;
}
}
@@ -1022,7 +1059,7 @@
synchronized long getEditLogSize() throws IOException {
assert(getNumStorageDirs() == editStreams.size());
long size = 0;
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+ for (int idx = 0; idx < editStreams.size(); idx++) {
long curSize = editStreams.get(idx).length();
assert (size == 0 || size == curSize) : "All streams must be the same";
size = curSize;
@@ -1040,10 +1077,12 @@
// exists in all directories.
//
if (existsNew()) {
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- if (!getEditNewFile(idx).exists()) {
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ File editsNew = getEditNewFile(it.next());
+ if (!editsNew.exists()) {
throw new IOException("Inconsistent existance of edits.new " +
- getEditNewFile(idx));
+ editsNew);
}
}
return; // nothing to do, edits.new exists!
@@ -1054,14 +1093,18 @@
//
// Open edits.new
//
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ StorageDirectory sd = it.next();
try {
- EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(idx));
+ EditLogFileOutputStream eStream =
+ new EditLogFileOutputStream(getEditNewFile(sd));
eStream.create();
editStreams.add(eStream);
} catch (IOException e) {
- processIOError(idx);
- idx--;
+ // remove stream and this storage directory from list
+ processIOError(sd);
+ it.remove();
}
}
}
@@ -1083,16 +1126,18 @@
//
// Delete edits and rename edits.new to edits.
//
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- if (!getEditNewFile(idx).renameTo(getEditFile(idx))) {
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
//
// renameTo() fails on Windows if the destination
// file exists.
//
- getEditFile(idx).delete();
- if (!getEditNewFile(idx).renameTo(getEditFile(idx))) {
- fsimage.processIOError(idx);
- idx--;
+ getEditFile(sd).delete();
+ if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
+ // Should we also remove from edits
+ it.remove();
}
}
}
@@ -1106,7 +1151,11 @@
* Return the name of the edit file
*/
synchronized File getFsEditName() throws IOException {
- return getEditFile(0);
+ StorageDirectory sd = null;
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();)
+ sd = it.next();
+ return getEditFile(sd);
}
/**
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Aug 29 16:13:12 2008
@@ -29,13 +29,14 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.text.SimpleDateFormat;
-import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import java.util.Random;
+import java.util.Map;
+import java.util.HashMap;
import java.lang.Math;
import java.nio.ByteBuffer;
@@ -58,6 +59,7 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
/**
* FSImage handles checkpointing and logging of the namespace edits.
@@ -83,6 +85,29 @@
String getName() {return fileName;}
}
+ /**
+ * Implementation of StorageDirType specific to namenode storage
+ * A Storage directory could be of type IMAGE which stores only fsimage,
+ * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
+ * stores both fsimage and edits.
+ */
+ static enum NameNodeDirType implements StorageDirType {
+ UNDEFINED,
+ IMAGE,
+ EDITS,
+ IMAGE_AND_EDITS;
+
+ public StorageDirType getStorageDirType() {
+ return this;
+ }
+
+ public boolean isOfType(StorageDirType type) {
+ if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
+ return true;
+ return this == type;
+ }
+ }
+
protected long checkpointTime = -1L;
private FSEditLog editLog = null;
private boolean isUpgradeFinalized = false;
@@ -90,6 +115,7 @@
* Directories for importing an image from a checkpoint.
*/
private Collection<File> checkpointDirs;
+ private Collection<File> checkpointEditsDirs;
/**
* Can fs-image be rolled?
@@ -111,9 +137,10 @@
/**
*/
- FSImage(Collection<File> fsDirs) throws IOException {
+ FSImage(Collection<File> fsDirs, Collection<File> fsEditsDirs)
+ throws IOException {
this();
- setStorageDirectories(fsDirs);
+ setStorageDirectories(fsDirs, fsEditsDirs);
}
public FSImage(StorageInfo storageInfo) {
@@ -126,57 +153,77 @@
public FSImage(File imageDir) throws IOException {
this();
ArrayList<File> dirs = new ArrayList<File>(1);
+ ArrayList<File> editsDirs = new ArrayList<File>(1);
dirs.add(imageDir);
- setStorageDirectories(dirs);
+ editsDirs.add(imageDir);
+ setStorageDirectories(dirs, editsDirs);
}
- void setStorageDirectories(Collection<File> fsDirs) throws IOException {
- this.storageDirs = new ArrayList<StorageDirectory>(fsDirs.size());
- for(Iterator<File> it = fsDirs.iterator(); it.hasNext();)
- this.addStorageDir(new StorageDirectory(it.next()));
+ void setStorageDirectories(Collection<File> fsNameDirs,
+ Collection<File> fsEditsDirs
+ ) throws IOException {
+ this.storageDirs = new ArrayList<StorageDirectory>();
+ // Add all name dirs with appropriate NameNodeDirType
+ for (File dirName : fsNameDirs) {
+ boolean isAlsoEdits = false;
+ for (File editsDirName : fsEditsDirs) {
+ if (editsDirName.compareTo(dirName) == 0) {
+ isAlsoEdits = true;
+ fsEditsDirs.remove(editsDirName);
+ break;
+ }
+ }
+ NameNodeDirType dirType = (isAlsoEdits) ?
+ NameNodeDirType.IMAGE_AND_EDITS :
+ NameNodeDirType.IMAGE;
+ this.addStorageDir(new StorageDirectory(dirName, dirType));
+ }
+
+ // Add edits dirs if they are different from name dirs
+ for (File dirName : fsEditsDirs) {
+ this.addStorageDir(new StorageDirectory(dirName,
+ NameNodeDirType.EDITS));
+ }
}
- void setCheckpointDirectories(Collection<File> dirs) {
+ void setCheckpointDirectories(Collection<File> dirs,
+ Collection<File> editsDirs) {
checkpointDirs = dirs;
- }
-
- /**
- */
- File getImageFile(int imageDirIdx, NameNodeFile type) {
- return getImageFile(getStorageDir(imageDirIdx), type);
+ checkpointEditsDirs = editsDirs;
}
static File getImageFile(StorageDirectory sd, NameNodeFile type) {
return new File(sd.getCurrentDir(), type.getName());
}
- File getEditFile(int idx) {
- return getImageFile(idx, NameNodeFile.EDITS);
+ File getEditFile(StorageDirectory sd) {
+ return getImageFile(sd, NameNodeFile.EDITS);
}
- File getEditNewFile(int idx) {
- return getImageFile(idx, NameNodeFile.EDITS_NEW);
+ File getEditNewFile(StorageDirectory sd) {
+ return getImageFile(sd, NameNodeFile.EDITS_NEW);
}
- File[] getFileNames(NameNodeFile type) {
- File[] list = new File[getNumStorageDirs()];
- int i=0;
- for(StorageDirectory sd : storageDirs) {
- list[i++] = getImageFile(sd, type);
+ File[] getFileNames(NameNodeFile type, NameNodeDirType dirType) {
+ ArrayList<File> list = new ArrayList<File>();
+ Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
+ dirIterator(dirType);
+ for ( ;it.hasNext(); ) {
+ list.add(getImageFile(it.next(), type));
}
- return list;
+ return list.toArray(new File[list.size()]);
}
File[] getImageFiles() {
- return getFileNames(NameNodeFile.IMAGE);
+ return getFileNames(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
}
File[] getEditsFiles() {
- return getFileNames(NameNodeFile.EDITS);
+ return getFileNames(NameNodeFile.EDITS, NameNodeDirType.EDITS);
}
File[] getTimeFiles() {
- return getFileNames(NameNodeFile.TIME);
+ return getFileNames(NameNodeFile.TIME, null);
}
/**
@@ -191,25 +238,36 @@
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(Collection<File> dataDirs,
- StartupOption startOpt
- ) throws IOException {
+ Collection<File> editsDirs,
+ StartupOption startOpt
+ ) throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
-
+
+ // none of the data dirs exist
+ if (dataDirs.size() == 0 || editsDirs.size() == 0)
+ throw new IOException(
+ "All specified directories are not accessible or do not exist.");
+
if(startOpt == StartupOption.IMPORT
&& (checkpointDirs == null || checkpointDirs.isEmpty()))
throw new IOException("Cannot import image from a checkpoint. "
+ "\"fs.checkpoint.dir\" is not set." );
+ if(startOpt == StartupOption.IMPORT
+ && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
+ throw new IOException("Cannot import image from a checkpoint. "
+ + "\"fs.checkpoint.edits.dir\" is not set." );
+
+ setStorageDirectories(dataDirs, editsDirs);
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
- AbstractList<StorageState> dataDirStates =
- new ArrayList<StorageState>(dataDirs.size());
+ Map<StorageDirectory, StorageState> dataDirStates =
+ new HashMap<StorageDirectory, StorageState>();
boolean isFormatted = false;
- for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
- File dataDir = it.next();
- StorageDirectory sd = new StorageDirectory(dataDir);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt);
@@ -217,7 +275,7 @@
switch(curState) {
case NON_EXISTENT:
// name-node fails if any of the configured storage dirs are missing
- throw new InconsistentFSStateException(sd.root,
+ throw new InconsistentFSStateException(sd.getRoot(),
"storage directory does not exist or is not accessible.");
case NOT_FORMATTED:
break;
@@ -234,19 +292,14 @@
if (startOpt == StartupOption.IMPORT && isFormatted)
// import of a checkpoint is allowed only into empty image directories
throw new IOException("Cannot import image from a checkpoint. "
- + " NameNode already contains an image in " + sd.root);
+ + " NameNode already contains an image in " + sd.getRoot());
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
- // add to the storage list
- addStorageDir(sd);
- dataDirStates.add(curState);
+ dataDirStates.put(sd,curState);
}
-
- if (dataDirs.size() == 0) // none of the data dirs exist
- throw new IOException(
- "All specified directories are not accessible or do not exist.");
+
if (!isFormatted && startOpt != StartupOption.ROLLBACK
&& startOpt != StartupOption.IMPORT)
throw new IOException("NameNode is not formatted.");
@@ -265,14 +318,15 @@
// 2. Format unformatted dirs.
this.checkpointTime = 0L;
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
- StorageState curState = dataDirStates.get(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ StorageState curState = dataDirStates.get(sd);
switch(curState) {
case NON_EXISTENT:
assert false : StorageState.NON_EXISTENT + " state cannot be here";
case NOT_FORMATTED:
- LOG.info("Storage directory " + sd.root + " is not formatted.");
+ LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty currrent dir
break;
@@ -308,10 +362,11 @@
}
// Upgrade is allowed only if there are
// no previous fs states in any of the directories
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
if (sd.getPreviousDir().exists())
- throw new InconsistentFSStateException(sd.root,
+ throw new InconsistentFSStateException(sd.getRoot(),
"previous fs state should not exist during upgrade. "
+ "Finalize or rollback first.");
}
@@ -325,9 +380,10 @@
int oldLV = this.getLayoutVersion();
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.checkpointTime = FSNamesystem.now();
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
- LOG.info("Upgrading image directory " + sd.root
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ LOG.info("Upgrading image directory " + sd.getRoot()
+ ".\n old LV = " + oldLV
+ "; old CTime = " + oldCTime
+ ".\n new LV = " + this.getLayoutVersion()
@@ -350,7 +406,7 @@
// rename tmp to previous
rename(tmpDir, prevDir);
isUpgradeFinalized = false;
- LOG.info("Upgrade of " + sd.root + " is complete.");
+ LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
}
initializeDistributedUpgrade();
editLog.open();
@@ -363,16 +419,17 @@
boolean canRollback = false;
FSImage prevState = new FSImage();
prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // use current directory then
- LOG.info("Storage directory " + sd.root
+ LOG.info("Storage directory " + sd.getRoot()
+ " does not contain previous fs state.");
sd.read(); // read and verify consistency with other directories
continue;
}
- StorageDirectory sdPrev = prevState.new StorageDirectory(sd.root);
+ StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
sdPrev.read(sdPrev.getPreviousVersionFile()); // read and verify consistency of the prev dir
canRollback = true;
}
@@ -382,13 +439,14 @@
// Now that we know all directories are going to be consistent
// Do rollback for each directory containing previous state
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
continue;
- LOG.info("Rolling back storage directory " + sd.root
+ LOG.info("Rolling back storage directory " + sd.getRoot()
+ ".\n new LV = " + prevState.getLayoutVersion()
+ "; new CTime = " + prevState.getCTime());
File tmpDir = sd.getRemovedTmp();
@@ -402,7 +460,7 @@
// delete tmp dir
deleteDir(tmpDir);
- LOG.info("Rollback of " + sd.root + " is complete.");
+ LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
}
isUpgradeFinalized = true;
// check whether name-node can start in regular mode
@@ -413,11 +471,11 @@
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // already discarded
LOG.info("Directory " + prevDir + " does not exist.");
- LOG.info("Finalize upgrade for " + sd.root + " is not required.");
+ LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
return;
}
LOG.info("Finalizing upgrade for storage directory "
- + sd.root + "."
+ + sd.getRoot() + "."
+ (getLayoutVersion()==0 ? "" :
"\n cur LV = " + this.getLayoutVersion()
+ "; cur CTime = " + this.getCTime()));
@@ -427,7 +485,7 @@
rename(prevDir, tmpDir);
deleteDir(tmpDir);
isUpgradeFinalized = true;
- LOG.info("Finalize upgrade for " + sd.root + " is complete.");
+ LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
}
/**
@@ -443,7 +501,8 @@
fsNamesys.dir.fsImage = ckptImage;
// load from the checkpoint dirs
try {
- ckptImage.recoverTransitionRead(checkpointDirs, StartupOption.REGULAR);
+ ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
+ StartupOption.REGULAR);
} finally {
ckptImage.close();
}
@@ -455,8 +514,10 @@
}
void finalizeUpgrade() throws IOException {
- for(int idx = 0; idx < getNumStorageDirs(); idx++)
- doFinalize(getStorageDir(idx));
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ doFinalize(it.next());
+ }
}
boolean isUpgradeFinalized() {
@@ -469,7 +530,7 @@
super.getFields(props, sd);
if (layoutVersion == 0)
throw new IOException("NameNode directory "
- + sd.root + " is not formatted.");
+ + sd.getRoot() + " is not formatted.");
String sDUS, sDUV;
sDUS = props.getProperty("distributedUpgradeState");
sDUV = props.getProperty("distributedUpgradeVersion");
@@ -539,30 +600,38 @@
/**
* Record new checkpoint time in order to
* distinguish healthy directories from the removed ones.
- *
- * @return -1 if successful, or the index of the failed storage directory.
+ * If there is an error writing new checkpoint time, the corresponding
+ * storage directory is removed from the list.
*/
- int incrementCheckpointTime() {
+ void incrementCheckpointTime() {
this.checkpointTime++;
- // Write new checkpoint time.
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
+
+ // Write new checkpoint time in all storage directories
+ for(Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
try {
- StorageDirectory sd = getStorageDir(idx);
writeCheckpointTime(sd);
- } catch(IOException e) {
- return idx;
+ } catch(IOException e) {
+ // Close any edits stream associated with this dir and remove directory
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+ editLog.processIOError(sd);
+ it.remove();
}
}
- return -1;
}
-
+
/**
- * If there is an IO Error on any log operations, remove that
- * directory from the list of directories.
+ * Remove storage directory given directory
*/
- void processIOError(int index) {
- assert(index >= 0 && index < getNumStorageDirs());
- storageDirs.remove(index);
+
+ void processIOError(File dirName) {
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ if (sd.getRoot().getPath().equals(dirName.getParent()))
+ it.remove();
+ }
}
public FSEditLog getEditLog() {
@@ -570,10 +639,10 @@
}
public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
- File oldImageDir = new File(sd.root, "image");
+ File oldImageDir = new File(sd.getRoot(), "image");
if (!oldImageDir.exists()) {
if(sd.getVersionFile().exists())
- throw new InconsistentFSStateException(sd.root,
+ throw new InconsistentFSStateException(sd.getRoot(),
oldImageDir + " does not exist.");
return false;
}
@@ -594,15 +663,19 @@
//
// Atomic move sequence, to recover from interrupted checkpoint
//
- void recoverInterruptedCheckpoint(StorageDirectory sd) throws IOException {
- File curFile = getImageFile(sd, NameNodeFile.IMAGE);
- File ckptFile = getImageFile(sd, NameNodeFile.IMAGE_NEW);
+ boolean recoverInterruptedCheckpoint(StorageDirectory nameSD,
+ StorageDirectory editsSD)
+ throws IOException {
+ boolean needToSave = false;
+ File curFile = getImageFile(nameSD, NameNodeFile.IMAGE);
+ File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW);
//
// If we were in the midst of a checkpoint
//
if (ckptFile.exists()) {
- if (getImageFile(sd, NameNodeFile.EDITS_NEW).exists()) {
+ needToSave = true;
+ if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
//
// checkpointing migth have uploaded a new
// merged image, but we discard it here because we are
@@ -622,7 +695,8 @@
// if the destination file already exists.
//
if (!ckptFile.renameTo(curFile)) {
- curFile.delete();
+ if (!curFile.delete())
+ LOG.warn("Unable to delete dir " + curFile + " before rename");
if (!ckptFile.renameTo(curFile)) {
throw new IOException("Unable to rename " + ckptFile +
" to " + curFile);
@@ -630,6 +704,7 @@
}
}
}
+ return needToSave;
}
/**
@@ -641,53 +716,74 @@
*/
boolean loadFSImage() throws IOException {
// Now check all curFiles and see which is the newest
- long latestCheckpointTime = Long.MIN_VALUE;
- StorageDirectory latestSD = null;
+ long latestNameCheckpointTime = Long.MIN_VALUE;
+ long latestEditsCheckpointTime = Long.MIN_VALUE;
+ StorageDirectory latestNameSD = null;
+ StorageDirectory latestEditsSD = null;
boolean needToSave = false;
isUpgradeFinalized = true;
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
- recoverInterruptedCheckpoint(sd);
+ for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
if (!sd.getVersionFile().exists()) {
needToSave |= true;
continue; // some of them might have just been formatted
}
- assert getImageFile(sd, NameNodeFile.IMAGE).exists() :
- "Image file must exist.";
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE))
+ assert getImageFile(sd, NameNodeFile.IMAGE).exists() :
+ "Image file must exist.";
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+ assert getImageFile(sd, NameNodeFile.EDITS).exists() :
+ "Edits file must exist.";
+
checkpointTime = readCheckpointTime(sd);
if ((checkpointTime != Long.MIN_VALUE) &&
- (checkpointTime != latestCheckpointTime)) {
+ ((checkpointTime != latestNameCheckpointTime) ||
+ (checkpointTime != latestEditsCheckpointTime))) {
// Force saving of new image if checkpoint time
// is not same in all of the storage directories.
needToSave |= true;
}
- if (latestCheckpointTime < checkpointTime) {
- latestCheckpointTime = checkpointTime;
- latestSD = sd;
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) &&
+ (latestNameCheckpointTime < checkpointTime)) {
+ latestNameCheckpointTime = checkpointTime;
+ latestNameSD = sd;
+ }
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) &&
+ (latestEditsCheckpointTime < checkpointTime)) {
+ latestEditsCheckpointTime = checkpointTime;
+ latestEditsSD = sd;
}
if (checkpointTime <= 0L)
needToSave |= true;
// set finalized flag
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
}
- assert latestSD != null : "Latest storage directory was not determined.";
+ assert latestNameSD != null : "Latest image storage directory was " +
+ "not determined.";
+ assert latestEditsSD != null : "Latest edits storage directory was " +
+ "not determined.";
+
+ // Make sure we are loading image and edits from same checkpoint
+ if (latestNameCheckpointTime != latestEditsCheckpointTime)
+ throw new IOException("Inconsitent storage detected, " +
+ "name and edits storage do not match");
+
+ // Recover from previous interrrupted checkpoint if any
+ needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
long startTime = FSNamesystem.now();
- long imageSize = getImageFile(latestSD, NameNodeFile.IMAGE).length();
+ long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
//
// Load in bits
//
- latestSD.read();
- needToSave |= loadFSImage(getImageFile(latestSD, NameNodeFile.IMAGE));
-
+ latestNameSD.read();
+ needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
LOG.info("Image file of size " + imageSize + " loaded in "
+ (FSNamesystem.now() - startTime)/1000 + " seconds.");
- //
- // read in the editlog from the same directory from
- // which we read in the image
- //
- needToSave |= (loadFSEdits(latestSD) > 0);
+
+ // Load latest edits
+ needToSave |= (loadFSEdits(latestEditsSD) > 0);
return needToSave;
}
@@ -911,13 +1007,18 @@
*/
public void saveFSImage() throws IOException {
editLog.createNewIfMissing();
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
- saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
- editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
- File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
- if (editsNew.exists())
- editLog.createEditLogFile(editsNew);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
+ if (dirType.isOfType(NameNodeDirType.IMAGE))
+ saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
+ if (dirType.isOfType(NameNodeDirType.EDITS)) {
+ editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+ File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
+ if (editsNew.exists())
+ editLog.createEditLogFile(editsNew);
+ }
}
ckptState = CheckpointStates.UPLOAD_DONE;
rollFSImage();
@@ -950,13 +1051,16 @@
sd.clearDirectory(); // create currrent dir
sd.lock();
try {
- saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
- editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+ NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
+ if (dirType.isOfType(NameNodeDirType.IMAGE))
+ saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
+ if (dirType.isOfType(NameNodeDirType.EDITS))
+ editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
sd.write();
} finally {
sd.unlock();
}
- LOG.info("Storage directory " + sd.root
+ LOG.info("Storage directory " + sd.getRoot()
+ " has been successfully formatted.");
}
@@ -965,8 +1069,9 @@
this.namespaceID = newNamespaceID();
this.cTime = 0L;
this.checkpointTime = FSNamesystem.now();
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
format(sd);
}
}
@@ -1153,8 +1258,9 @@
if (!editLog.existsNew()) {
throw new IOException("New Edits file does not exist");
}
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ StorageDirectory sd = it.next();
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
if (!ckpt.exists()) {
throw new IOException("Checkpoint file " + ckpt +
@@ -1166,8 +1272,9 @@
//
// Renames new image
//
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ StorageDirectory sd = it.next();
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
File curFile = getImageFile(sd, NameNodeFile.IMAGE);
// renameTo fails on Windows if the destination file
@@ -1175,25 +1282,31 @@
if (!ckpt.renameTo(curFile)) {
curFile.delete();
if (!ckpt.renameTo(curFile)) {
- editLog.processIOError(idx);
- idx--;
+ // Close edit stream, if this directory is also used for edits
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+ editLog.processIOError(sd);
+ it.remove();
}
}
}
//
- // Updates the fstime file and write version file
+ // Updates the fstime file on all directories (fsimage and edits)
+ // and write version file
//
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.checkpointTime = FSNamesystem.now();
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- StorageDirectory sd = getStorageDir(idx);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
try {
sd.write();
} catch (IOException e) {
- LOG.error("Cannot write file " + sd.root, e);
- editLog.processIOError(idx);
- idx--;
+ LOG.error("Cannot write file " + sd.getRoot(), e);
+ // Close edit stream, if this directory is also used for edits
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+ editLog.processIOError(sd);
+ it.remove();
}
}
ckptState = CheckpointStates.START;
@@ -1244,7 +1357,11 @@
* Return the name of the image file.
*/
File getFsImageName() {
- return getImageFile(0, NameNodeFile.IMAGE);
+ StorageDirectory sd = null;
+ for (Iterator<StorageDirectory> it =
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
+ sd = it.next();
+ return getImageFile(sd, NameNodeFile.IMAGE);
}
public File getFsEditName() throws IOException {
@@ -1252,7 +1369,12 @@
}
File getFsTimeName() {
- return getImageFile(0, NameNodeFile.TIME);
+ StorageDirectory sd = null;
+ // NameNodeFile.TIME shoul be same on all directories
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();)
+ sd = it.next();
+ return getImageFile(sd, NameNodeFile.TIME);
}
/**
@@ -1260,11 +1382,12 @@
* checkpointing.
*/
File[] getFsImageNameCheckpoint() {
- File[] list = new File[getNumStorageDirs()];
- for(int i = 0; i < getNumStorageDirs(); i++) {
- list[i] = getImageFile(getStorageDir(i), NameNodeFile.IMAGE_NEW);
+ ArrayList<File> list = new ArrayList<File>();
+ for (Iterator<StorageDirectory> it =
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW));
}
- return list;
+ return list.toArray(new File[list.size()]);
}
/**
@@ -1383,6 +1506,20 @@
}
return dirs;
}
+
+ static Collection<File> getCheckpointEditsDirs(Configuration conf,
+ String defaultName) {
+ Collection<String> dirNames =
+ conf.getStringCollection("fs.checkpoint.edits.dir");
+ if (dirNames.size() == 0 && defaultName != null) {
+ dirNames.add(defaultName);
+ }
+ Collection<File> dirs = new ArrayList<File>(dirNames.size());
+ for(String name : dirNames) {
+ dirs.add(new File(name));
+ }
+ return dirs;
+ }
static private final UTF8 U_STR = new UTF8();
static String readString(DataInputStream in) throws IOException {
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Aug 29 16:13:12 2008
@@ -302,7 +302,8 @@
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
this.dir = new FSDirectory(this, conf);
StartupOption startOpt = NameNode.getStartupOption(conf);
- this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
+ this.dir.loadFSImage(getNamespaceDirs(conf),
+ getNamespaceEditsDirs(conf), startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNode.getNameNodeMetrics().fsImageLoadTime.set(
@@ -385,6 +386,18 @@
}
return dirs;
}
+
+ public static Collection<File> getNamespaceEditsDirs(Configuration conf) {
+ Collection<String> editsDirNames =
+ conf.getStringCollection("dfs.name.edits.dir");
+ if (editsDirNames.isEmpty())
+ editsDirNames.add("/tmp/hadoop/dfs/name");
+ Collection<File> dirs = new ArrayList<File>(editsDirNames.size());
+ for(String name : editsDirNames) {
+ dirs.add(new File(name));
+ }
+ return dirs;
+ }
/**
* dirs is a list of directories where the filesystem directory state
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Aug 29 16:13:12 2008
@@ -742,6 +742,8 @@
boolean isConfirmationNeeded
) throws IOException {
Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
+ Collection<File> editDirsToFormat =
+ FSNamesystem.getNamespaceEditsDirs(conf);
for(Iterator<File> it = dirsToFormat.iterator(); it.hasNext();) {
File curDir = it.next();
if (!curDir.exists())
@@ -756,7 +758,8 @@
}
}
- FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat), conf);
+ FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
+ editDirsToFormat), conf);
nsys.dir.fsImage.format();
return false;
}
@@ -765,7 +768,10 @@
boolean isConfirmationNeeded
) throws IOException {
Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
- FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat), conf);
+ Collection<File> editDirsToFormat =
+ FSNamesystem.getNamespaceEditsDirs(conf);
+ FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
+ editDirsToFormat), conf);
System.err.print(
"\"finalize\" will remove the previous state of the files system.\n"
+ "Recent upgrade will become permanent.\n"
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Aug 29 16:13:12 2008
@@ -34,6 +34,8 @@
import java.net.*;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+
import org.apache.hadoop.metrics.jvm.JvmMetrics;
/**********************************************************
@@ -66,6 +68,7 @@
private String infoBindAddress;
private Collection<File> checkpointDirs;
+ private Collection<File> checkpointEditsDirs;
private long checkpointPeriod; // in seconds
private long checkpointSize; // size (in MB) of current Edit Log
@@ -135,8 +138,10 @@
fsName = getInfoServer();
checkpointDirs = FSImage.getCheckpointDirs(conf,
"/tmp/hadoop/dfs/namesecondary");
+ checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf,
+ "/tmp/hadoop/dfs/namesecondary");
checkpointImage = new CheckpointStorage();
- checkpointImage.recoverCreate(checkpointDirs);
+ checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
// Initialize other scheduling parameters from the configuration
checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
@@ -329,7 +334,7 @@
private void startCheckpoint() throws IOException {
checkpointImage.unlockAll();
checkpointImage.getEditLog().close();
- checkpointImage.recoverCreate(checkpointDirs);
+ checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
checkpointImage.startCheckpoint();
}
@@ -483,23 +488,29 @@
* Recover from an unsuccessful checkpoint is necessary.
*
* @param dataDirs
+ * @param editsDirs
* @throws IOException
*/
- void recoverCreate(Collection<File> dataDirs) throws IOException {
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
- for(File dataDir : dataDirs) {
+ void recoverCreate(Collection<File> dataDirs,
+ Collection<File> editsDirs) throws IOException {
+ Collection<File> tempDataDirs = new ArrayList<File>(dataDirs);
+ Collection<File> tempEditsDirs = new ArrayList<File>(editsDirs);
+ this.storageDirs = new ArrayList<StorageDirectory>();
+ setStorageDirectories(tempDataDirs, tempEditsDirs);
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
boolean isAccessible = true;
try { // create directories if don't exist yet
- if(!dataDir.mkdirs()) {
- // do nothing, directory is already ctreated
+ if(!sd.getRoot().mkdirs()) {
+ // do nothing, directory is already created
}
} catch(SecurityException se) {
isAccessible = false;
}
if(!isAccessible)
- throw new InconsistentFSStateException(dataDir,
+ throw new InconsistentFSStateException(sd.getRoot(),
"cannot access checkpoint directory.");
- StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
curState = sd.analyzeStorage(StartupOption.REGULAR);
@@ -507,7 +518,7 @@
switch(curState) {
case NON_EXISTENT:
// fail if any of the configured checkpoint dirs are inaccessible
- throw new InconsistentFSStateException(sd.root,
+ throw new InconsistentFSStateException(sd.getRoot(),
"checkpoint directory does not exist or is not accessible.");
case NOT_FORMATTED:
break; // it's ok since initially there is no current and VERSION
@@ -520,9 +531,6 @@
sd.unlock();
throw ioe;
}
- // add to the storage list
- addStorageDir(sd);
- LOG.warn("Checkpoint directory " + sd.root + " is added.");
}
}
@@ -566,9 +574,19 @@
*/
private void doMerge(CheckpointSignature sig) throws IOException {
getEditLog().open();
- StorageDirectory sd = getStorageDir(0);
- loadFSImage(FSImage.getImageFile(sd, NameNodeFile.IMAGE));
- loadFSEdits(sd);
+ StorageDirectory sdName = null;
+ StorageDirectory sdEdits = null;
+ Iterator<StorageDirectory> it = null;
+ it = dirIterator(NameNodeDirType.IMAGE);
+ if (it.hasNext())
+ sdName = it.next();
+ it = dirIterator(NameNodeDirType.EDITS);
+ if (it.hasNext())
+ sdEdits = it.next();
+ if ((sdName == null) || (sdEdits == null))
+ throw new IOException("Could not locate checkpoint directories");
+ loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+ loadFSEdits(sdEdits);
sig.validateStorageInfo(this);
saveFSImage();
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Aug 29 16:13:12 2008
@@ -96,7 +96,8 @@
public MiniDFSCluster(Configuration conf,
int numDataNodes,
StartupOption nameNodeOperation) throws IOException {
- this(0, conf, numDataNodes, false, false, nameNodeOperation, null, null, null);
+ this(0, conf, numDataNodes, false, false, false, nameNodeOperation,
+ null, null, null);
}
/**
@@ -116,7 +117,7 @@
int numDataNodes,
boolean format,
String[] racks) throws IOException {
- this(0, conf, numDataNodes, format, true, null, racks, null, null);
+ this(0, conf, numDataNodes, format, true, true, null, racks, null, null);
}
/**
@@ -137,7 +138,7 @@
int numDataNodes,
boolean format,
String[] racks, String[] hosts) throws IOException {
- this(0, conf, numDataNodes, format, true, null, racks, hosts, null);
+ this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null);
}
/**
@@ -165,8 +166,8 @@
boolean manageDfsDirs,
StartupOption operation,
String[] racks) throws IOException {
- this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation,
- racks, null, null);
+ this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
+ operation, racks, null, null);
}
/**
@@ -196,8 +197,8 @@
StartupOption operation,
String[] racks,
long[] simulatedCapacities) throws IOException {
- this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, racks, null,
- simulatedCapacities);
+ this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
+ operation, racks, null, simulatedCapacities);
}
/**
@@ -212,8 +213,10 @@
* will be modified as necessary.
* @param numDataNodes Number of DataNodes to start; may be zero
* @param format if true, format the NameNode and DataNodes before starting up
- * @param manageDfsDirs if true, the data directories for servers will be
+ * @param manageNameDfsDirs if true, the data directories for servers will be
* created and dfs.name.dir and dfs.data.dir will be set in the conf
+ * @param manageDataDfsDirs if true, the data directories for datanodes will
+ * be created and dfs.data.dir set to same in the conf
* @param operation the operation with which to start the servers. If null
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
* @param racks array of strings indicating the rack that each DataNode is on
@@ -224,7 +227,8 @@
Configuration conf,
int numDataNodes,
boolean format,
- boolean manageDfsDirs,
+ boolean manageNameDfsDirs,
+ boolean manageDataDfsDirs,
StartupOption operation,
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
@@ -242,9 +246,11 @@
// Setup the NameNode configuration
FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
conf.set("dfs.http.address", "127.0.0.1:0");
- if (manageDfsDirs) {
+ if (manageNameDfsDirs) {
conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
new File(base_dir, "name2").getPath());
+ conf.set("fs.checkpoint.dir", new File(base_dir, "namesecondary1").
+ getPath()+"," + new File(base_dir, "namesecondary2").getPath());
}
int replication = conf.getInt("dfs.replication", 3);
@@ -270,7 +276,8 @@
nameNode = NameNode.createNameNode(args, conf);
// Start the DataNodes
- startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, simulatedCapacities);
+ startDataNodes(conf, numDataNodes, manageDataDfsDirs,
+ operation, racks, hosts, simulatedCapacities);
waitClusterUp();
}
@@ -686,13 +693,20 @@
}
/**
- * Get the directories where the namenode stores its state.
+ * Get the directories where the namenode stores its image.
*/
public Collection<File> getNameDirs() {
return FSNamesystem.getNamespaceDirs(conf);
}
/**
+ * Get the directories where the namenode stores its edits.
+ */
+ public Collection<File> getNameEditsDirs() {
+ return FSNamesystem.getNamespaceEditsDirs(conf);
+ }
+
+ /**
* Wait until the cluster is active and running.
*/
public void waitActive() throws IOException {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Fri Aug 29 16:13:12 2008
@@ -21,6 +21,7 @@
import java.io.*;
import java.util.Collection;
import java.util.List;
+import java.util.Iterator;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -28,6 +29,8 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.ErrorSimulator;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -173,10 +176,16 @@
// and that temporary checkpoint files are gone.
FSImage image = cluster.getNameNode().getFSImage();
int nrDirs = image.getNumStorageDirs();
- for(int idx = 0; idx < nrDirs; idx++) {
- assertFalse(image.getImageFile(idx, NameNodeFile.IMAGE_NEW).exists());
- assertFalse(image.getEditNewFile(idx).exists());
- File edits = image.getEditFile(idx);
+ for (Iterator<StorageDirectory> it =
+ image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ assertFalse(FSImage.getImageFile(sd, NameNodeFile.IMAGE_NEW).exists());
+ }
+ for (Iterator<StorageDirectory> it =
+ image.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ assertFalse(image.getEditNewFile(sd).exists());
+ File edits = image.getEditFile(sd);
assertTrue(edits.exists()); // edits should exist and be empty
long editsLen = edits.length();
assertTrue(editsLen == Integer.SIZE/Byte.SIZE);
@@ -335,7 +344,12 @@
FSImage image = cluster.getNameNode().getFSImage();
try {
assertTrue(!fileSys.exists(file1));
- long fsimageLength = image.getImageFile(0, NameNodeFile.IMAGE).length();
+ StorageDirectory sd = null;
+ for (Iterator<StorageDirectory> it =
+ image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
+ sd = it.next();
+ assertTrue(sd != null);
+ long fsimageLength = FSImage.getImageFile(sd, NameNodeFile.IMAGE).length();
//
// Make the checkpoint
//
@@ -352,9 +366,9 @@
ErrorSimulator.clearErrorSimulation(2);
// Verify that image file sizes did not change.
- int nrDirs = image.getNumStorageDirs();
- for(int idx = 0; idx < nrDirs; idx++) {
- assertTrue(image.getImageFile(idx,
+ for (Iterator<StorageDirectory> it =
+ image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ assertTrue(FSImage.getImageFile(it.next(),
NameNodeFile.IMAGE).length() == fsimageLength);
}
@@ -385,8 +399,11 @@
void testStartup(Configuration conf) throws IOException {
System.out.println("Startup of the name-node in the checkpoint directory.");
String primaryDirs = conf.get("dfs.name.dir");
+ String primaryEditsDirs = conf.get("dfs.name.edits.dir");
String checkpointDirs = conf.get("fs.checkpoint.dir");
- NameNode nn = startNameNode(conf, checkpointDirs, StartupOption.REGULAR);
+ String checkpointEditsDirs = conf.get("fs.checkpoint.edits.dir");
+ NameNode nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
+ StartupOption.REGULAR);
// Starting secondary node in the same directory as the primary
System.out.println("Startup of secondary in the same dir as the primary.");
@@ -403,7 +420,8 @@
// Starting primary node in the same directory as the secondary
System.out.println("Startup of primary in the same dir as the secondary.");
// secondary won't start without primary
- nn = startNameNode(conf, primaryDirs, StartupOption.REGULAR);
+ nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+ StartupOption.REGULAR);
boolean succeed = false;
do {
try {
@@ -415,7 +433,8 @@
} while(!succeed);
nn.stop(); nn = null;
try {
- nn = startNameNode(conf, checkpointDirs, StartupOption.REGULAR);
+ nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
+ StartupOption.REGULAR);
assertFalse(nn.getFSImage().isLockSupported(0));
nn.stop(); nn = null;
} catch (IOException e) { // expected to fail
@@ -425,7 +444,8 @@
// Try another secondary in the same directory
System.out.println("Startup of two secondaries in the same dir.");
// secondary won't start without primary
- nn = startNameNode(conf, primaryDirs, StartupOption.REGULAR);
+ nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+ StartupOption.REGULAR);
SecondaryNameNode secondary2 = null;
try {
secondary2 = startSecondaryNameNode(conf);
@@ -440,15 +460,18 @@
// Import a checkpoint with existing primary image.
System.out.println("Import a checkpoint with existing primary image.");
try {
- nn = startNameNode(conf, primaryDirs, StartupOption.IMPORT);
+ nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+ StartupOption.IMPORT);
assertTrue(false);
} catch (IOException e) { // expected to fail
assertTrue(nn == null);
}
-
+
// Remove current image and import a checkpoint.
System.out.println("Import a checkpoint with existing primary image.");
List<File> nameDirs = (List<File>)FSNamesystem.getNamespaceDirs(conf);
+ List<File> nameEditsDirs = (List<File>)FSNamesystem.
+ getNamespaceEditsDirs(conf);
long fsimageLength = new File(new File(nameDirs.get(0), "current"),
NameNodeFile.IMAGE.getName()).length();
for(File dir : nameDirs) {
@@ -458,18 +481,29 @@
if (!dir.mkdirs())
throw new IOException("Cannot create directory " + dir);
}
- nn = startNameNode(conf, primaryDirs, StartupOption.IMPORT);
+
+ for(File dir : nameEditsDirs) {
+ if(dir.exists())
+ if(!(FileUtil.fullyDelete(dir)))
+ throw new IOException("Cannot remove directory: " + dir);
+ if (!dir.mkdirs())
+ throw new IOException("Cannot create directory " + dir);
+ }
+
+ nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+ StartupOption.IMPORT);
// Verify that image file sizes did not change.
FSImage image = nn.getFSImage();
- int nrDirs = image.getNumStorageDirs();
- for(int idx = 0; idx < nrDirs; idx++) {
- assertTrue(image.getImageFile(idx,
- NameNodeFile.IMAGE).length() == fsimageLength);
+ for (Iterator<StorageDirectory> it =
+ image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ assertTrue(FSImage.getImageFile(it.next(),
+ NameNodeFile.IMAGE).length() == fsimageLength);
}
nn.stop();
// recover failed checkpoint
- nn = startNameNode(conf, primaryDirs, StartupOption.REGULAR);
+ nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
+ StartupOption.REGULAR);
Collection<File> secondaryDirs = FSImage.getCheckpointDirs(conf, null);
for(File dir : secondaryDirs) {
Storage.rename(new File(dir, "current"),
@@ -504,10 +538,12 @@
NameNode startNameNode( Configuration conf,
String imageDirs,
+ String editsDirs,
StartupOption start) throws IOException {
conf.set("fs.default.name", "hdfs://localhost:0");
conf.set("dfs.http.address", "0.0.0.0:0");
conf.set("dfs.name.dir", imageDirs);
+ conf.set("dfs.name.edits.dir", editsDirs);
String[] args = new String[]{start.getName()};
NameNode nn = NameNode.createNameNode(args, conf);
assertTrue(nn.isInSafeMode());
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=690418&r1=690417&r2=690418&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Aug 29 16:13:12 2008
@@ -31,6 +31,9 @@
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
/**
* This class tests the creation and validation of a checkpoint.
@@ -85,6 +88,7 @@
// start a cluster
Collection<File> namedirs = null;
+ Collection<File> editsdirs = null;
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes,
true, true, null, null);
@@ -94,6 +98,7 @@
try {
namedirs = cluster.getNameDirs();
+ editsdirs = cluster.getNameEditsDirs();
} finally {
fileSys.close();
cluster.shutdown();
@@ -105,7 +110,7 @@
numdirs++;
}
- FSImage fsimage = new FSImage(namedirs);
+ FSImage fsimage = new FSImage(namedirs, editsdirs);
FSEditLog editLog = fsimage.getEditLog();
// set small size of flush buffer
@@ -136,8 +141,9 @@
// If there were any corruptions, it is likely that the reading in
// of these transactions will throw an exception.
//
- for (int i = 0; i < numdirs; i++) {
- File editFile = fsimage.getEditFile(i);
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
System.out.println("Verifying file: " + editFile);
int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile));
int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease();