You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/11/16 23:23:53 UTC
svn commit: r1035841 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
Author: jghoman
Date: Tue Nov 16 22:23:52 2010
New Revision: 1035841
URL: http://svn.apache.org/viewvc?rev=1035841&view=rev
Log:
HDFS-1071. savenamespace should write the fsimage to all configured fs.name.dir in parallel. Contributed by Dmytro Molkov.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1035841&r1=1035840&r2=1035841&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 16 22:23:52 2010
@@ -200,6 +200,9 @@ Trunk (unreleased changes)
HDFS-455. Make NN and DN handle in a intuitive way comma-separated
configuration strings. (Michele Catasta via eli)
+ HDFS-1071. savenamespace should write the fsimage to all configured
+ fs.name.dir in parallel (Dmytro Molkov via jghoman)
+
OPTIMIZATIONS
HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1035841&r1=1035840&r2=1035841&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Nov 16 22:23:52 2010
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -268,7 +269,7 @@ public class FSEditLog {
* except fsimage.processIOError)
*/
synchronized void processIOError(
- ArrayList<EditLogOutputStream> errorStreams,
+ List<EditLogOutputStream> errorStreams,
boolean propagate) {
if (errorStreams == null || errorStreams.size() == 0) {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1035841&r1=1035840&r2=1035841&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Nov 16 22:23:52 2010
@@ -37,6 +37,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -172,7 +173,13 @@ public class FSImage extends Storage {
/**
* Used for saving the image to disk
*/
- static private final FsPermission FILE_PERM = new FsPermission((short)0);
+ static private final ThreadLocal<FsPermission> FILE_PERM =
+ new ThreadLocal<FsPermission>() {
+ @Override
+ protected FsPermission initialValue() {
+ return new FsPermission((short) 0);
+ }
+ };
static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
/**
@@ -850,30 +857,34 @@ public class FSImage extends Storage {
* @param propagate - flag, if set - then call corresponding EditLog stream's
* processIOError function.
*/
- void processIOError(ArrayList<StorageDirectory> sds, boolean propagate) {
+ void processIOError(List<StorageDirectory> sds, boolean propagate) {
ArrayList<EditLogOutputStream> al = null;
- for(StorageDirectory sd:sds) {
- // if has a stream assosiated with it - remove it too..
- if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
- EditLogOutputStream eStream = editLog.getEditsStream(sd);
- if(al == null) al = new ArrayList<EditLogOutputStream>(1);
- al.add(eStream);
- }
-
- for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
- StorageDirectory sd1 = it.next();
- if (sd.equals(sd1)) {
- //add storage to the removed list
- LOG.warn("FSImage:processIOError: removing storage: "
- + sd.getRoot().getPath());
- try {
- sd1.unlock(); //unlock before removing (in case it will be restored)
- } catch (Exception e) {
- // nothing
+ synchronized (sds) {
+ for (StorageDirectory sd : sds) {
+ // if has a stream assosiated with it - remove it too..
+ if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+ EditLogOutputStream eStream = editLog.getEditsStream(sd);
+ if (al == null)
+ al = new ArrayList<EditLogOutputStream>(1);
+ al.add(eStream);
+ }
+
+ for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+ StorageDirectory sd1 = it.next();
+ if (sd.equals(sd1)) {
+ // add storage to the removed list
+ LOG.warn("FSImage:processIOError: removing storage: "
+ + sd.getRoot().getPath());
+ try {
+ sd1.unlock(); // unlock before removing (in case it will be
+ // restored)
+ } catch (Exception e) {
+ // nothing
+ }
+ removedStorageDirs.add(sd1);
+ it.remove();
+ break;
}
- removedStorageDirs.add(sd1);
- it.remove();
- break;
}
}
}
@@ -1409,6 +1420,53 @@ public class FSImage extends Storage {
this.imageDigest = digest;
}
/**
+ * FSImageSaver is being run in a separate thread when saving
+ * FSImage. There is one thread per each copy of the image.
+ *
+ * FSImageSaver assumes that it was launched from a thread that holds
+ * FSNamesystem lock and waits for the execution of FSImageSaver thread
+ * to finish.
+ * This way we are guraranteed that the namespace is not being updated
+ * while multiple instances of FSImageSaver are traversing it
+ * and writing it out.
+ */
+ private class FSImageSaver implements Runnable {
+ private StorageDirectory sd;
+ private List<StorageDirectory> errorSDs;
+
+ FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs) {
+ this.sd = sd;
+ this.errorSDs = errorSDs;
+ }
+
+ public void run() {
+ try {
+ saveCurrent(sd);
+ } catch (IOException ie) {
+ LOG.error("Unable to save image for " + sd.getRoot(), ie);
+ errorSDs.add(sd);
+ }
+ }
+
+ public String toString() {
+ return "FSImageSaver for " + sd.getRoot() +
+ " of type " + sd.getStorageDirType();
+ }
+ }
+
+ private void waitForThreads(List<Thread> threads) {
+ for (Thread thread : threads) {
+ while (thread.isAlive()) {
+ try {
+ thread.join();
+ } catch (InterruptedException iex) {
+ LOG.error("Caught exception while waiting for thread " +
+ thread.getName() + " to finish. Retrying join");
+ }
+ }
+ }
+ }
+ /**
* Save the contents of the FS image and create empty edits.
*
* In order to minimize the recovery effort in case of failure during
@@ -1428,7 +1486,8 @@ public class FSImage extends Storage {
editLog.close();
if(renewCheckpointTime)
this.checkpointTime = now();
- ArrayList<StorageDirectory> errorSDs = new ArrayList<StorageDirectory>();
+ List<StorageDirectory> errorSDs =
+ Collections.synchronizedList(new ArrayList<StorageDirectory>());
// mv current -> lastcheckpoint.tmp
for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
@@ -1441,17 +1500,18 @@ public class FSImage extends Storage {
}
}
+ List<Thread> saveThreads = new ArrayList<Thread>();
// save images into current
for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
it.hasNext();) {
StorageDirectory sd = it.next();
- try {
- saveCurrent(sd);
- } catch(IOException ie) {
- LOG.error("Unable to save image for " + sd.getRoot(), ie);
- errorSDs.add(sd);
- }
+ FSImageSaver saver = new FSImageSaver(sd, errorSDs);
+ Thread saveThread = new Thread(saver, saver.toString());
+ saveThreads.add(saveThread);
+ saveThread.start();
}
+ waitForThreads(saveThreads);
+ saveThreads.clear();
// -NOTE-
// If NN has image-only and edits-only storage directories and fails here
@@ -1462,18 +1522,17 @@ public class FSImage extends Storage {
// to the old state contained in their lastcheckpoint.tmp.
// The edits directories should be discarded during startup because their
// checkpointTime is older than that of image directories.
-
// recreate edits in current
for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
it.hasNext();) {
- StorageDirectory sd = it.next();
- try {
- saveCurrent(sd);
- } catch(IOException ie) {
- LOG.error("Unable to save edits for " + sd.getRoot(), ie);
- errorSDs.add(sd);
- }
+ final StorageDirectory sd = it.next();
+ FSImageSaver saver = new FSImageSaver(sd, errorSDs);
+ Thread saveThread = new Thread(saver, saver.toString());
+ saveThreads.add(saveThread);
+ saveThread.start();
}
+ waitForThreads(saveThreads);
+
// mv lastcheckpoint.tmp -> previous.checkpoint
for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@@ -1605,6 +1664,7 @@ public class FSImage extends Storage {
int nameLen = name.position();
out.writeShort(nameLen);
out.write(name.array(), name.arrayOffset(), nameLen);
+ FsPermission filePerm = FILE_PERM.get();
if (node.isDirectory()) {
out.writeShort(0); // replication
out.writeLong(node.getModificationTime());
@@ -1613,10 +1673,10 @@ public class FSImage extends Storage {
out.writeInt(-1); // # of blocks
out.writeLong(node.getNsQuota());
out.writeLong(node.getDsQuota());
- FILE_PERM.fromShort(node.getFsPermissionShort());
+ filePerm.fromShort(node.getFsPermissionShort());
PermissionStatus.write(out, node.getUserName(),
node.getGroupName(),
- FILE_PERM);
+ filePerm);
} else if (node.isLink()) {
out.writeShort(0); // replication
out.writeLong(0); // modification time
@@ -1624,10 +1684,10 @@ public class FSImage extends Storage {
out.writeLong(0); // preferred block size
out.writeInt(-2); // # of blocks
Text.writeString(out, ((INodeSymlink)node).getLinkValue());
- FILE_PERM.fromShort(node.getFsPermissionShort());
+ filePerm.fromShort(node.getFsPermissionShort());
PermissionStatus.write(out, node.getUserName(),
node.getGroupName(),
- FILE_PERM);
+ filePerm);
} else {
INodeFile fileINode = (INodeFile)node;
out.writeShort(fileINode.getReplication());
@@ -1638,10 +1698,10 @@ public class FSImage extends Storage {
out.writeInt(blocks.length);
for (Block blk : blocks)
blk.write(out);
- FILE_PERM.fromShort(fileINode.getFsPermissionShort());
+ filePerm.fromShort(fileINode.getFsPermissionShort());
PermissionStatus.write(out, fileINode.getUserName(),
fileINode.getGroupName(),
- FILE_PERM);
+ filePerm);
}
}