You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/26 10:25:49 UTC
svn commit: r425672 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/FSDirectory.java
src/java/org/apache/hadoop/dfs/FSEditLog.java
src/java/org/apache/hadoop/dfs/FSImage.java
Author: cutting
Date: Wed Jul 26 01:25:49 2006
New Revision: 425672
URL: http://svn.apache.org/viewvc?rev=425672&view=rev
Log:
HADOOP-335. Refactor namenode logging. Contributed by Konstantin.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=425672&r1=425671&r2=425672&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jul 26 01:25:49 2006
@@ -90,6 +90,9 @@
writing zero-compressed integers (VInts and VLongs).
(Hairong Kuang via cutting)
+26. HADOOP-335. Refactor DFS namespace/transaction logging in
+ namenode. (Konstantin Shvachko via cutting)
+
Release 0.4.0 - 2006-06-28
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=425672&r1=425671&r2=425672&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Jul 26 01:25:49 2006
@@ -21,7 +21,6 @@
import java.util.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.Metrics;
@@ -34,21 +33,12 @@
* It keeps the filename->blockset mapping always-current
* and logged to disk.
*
+ * TODO: Factor out to a standalone class.
+ *
* @author Mike Cafarella
*************************************************/
class FSDirectory implements FSConstants {
- private static final String FS_IMAGE = "fsimage";
- private static final String NEW_FS_IMAGE = "fsimage.new";
- private static final String OLD_FS_IMAGE = "fsimage.old";
-
- private static final byte OP_ADD = 0;
- private static final byte OP_RENAME = 1;
- private static final byte OP_DELETE = 2;
- private static final byte OP_MKDIR = 3;
- private static final byte OP_SET_REPLICATION = 4;
- private int numFilesDeleted = 0;
-
/******************************************************
* We keep an in-memory representation of the file/block
* hierarchy.
@@ -92,6 +82,38 @@
public short getReplication() {
return this.blockReplication;
}
+
+ /**
+ * Get local file name
+ * @return local file name
+ */
+ String getLocalName() {
+ return name;
+ }
+
+ /**
+ * Get file blocks
+ * @return file blocks
+ */
+ Block[] getBlocks() {
+ return this.blocks;
+ }
+
+ /**
+ * Get parent directory
+ * @return parent INode
+ */
+ INode getParent() {
+ return this.parent;
+ }
+
+ /**
+ * Get children
+ * @return TreeMap of children
+ */
+ TreeMap getChildren() {
+ return this.children;
+ }
/**
* This is the external interface
@@ -182,7 +204,7 @@
return true;
}
}
-
+
/**
* Collect all the blocks at this INode and all its children.
* This operation is performed after a node is removed from the tree,
@@ -269,103 +291,43 @@
v.add(child);
}
}
-
- /**
- */
- void saveImage(String parentPrefix, DataOutputStream out) throws IOException {
- String fullName = "";
- if (parent != null) {
- fullName = parentPrefix + "/" + name;
- new UTF8(fullName).write(out);
- out.writeShort(blockReplication);
- if (blocks == null) {
- out.writeInt(0);
- } else {
- out.writeInt(blocks.length);
- for (int i = 0; i < blocks.length; i++) {
- blocks[i].write(out);
- }
- }
- }
- for (Iterator it = children.values().iterator(); it.hasNext(); ) {
- INode child = (INode) it.next();
- child.saveImage(fullName, out);
- }
- }
}
INode rootDir = new INode("");
TreeMap activeBlocks = new TreeMap();
TreeMap activeLocks = new TreeMap();
- DataOutputStream editlog = null;
+ FSImage fsImage;
boolean ready = false;
- int namespaceID = 0; /// a persistent attribute of the namespace
-
+ int namespaceID = 0; // TODO: move to FSImage class, it belongs there
+ // Metrics members
private MetricsRecord metricsRecord = null;
+ private int numFilesDeleted = 0;
/** Access an existing dfs name directory. */
public FSDirectory(File dir, Configuration conf) throws IOException {
- File fullimage = new File(dir, "image");
- if (! fullimage.exists()) {
- throw new IOException("NameNode not formatted: " + dir);
- }
- File edits = new File(dir, "edits");
- if (loadFSImage(fullimage, edits, conf)) {
- saveFSImage(fullimage, edits);
- }
-
- synchronized (this) {
- this.ready = true;
- this.notifyAll();
- this.editlog = new DataOutputStream(new FileOutputStream(edits));
- editlog.writeInt( DFS_CURRENT_VERSION );
- }
-
- metricsRecord = Metrics.createRecord("dfs", "namenode");
+ this.fsImage = new FSImage( dir, conf );
+ fsImage.loadFSImage( this, conf );
+ synchronized (this) {
+ this.ready = true;
+ this.notifyAll();
+ fsImage.getEditLog().create();
+ }
+ metricsRecord = Metrics.createRecord("dfs", "namenode");
}
/** Create a new dfs name directory. Caution: this destroys all files
- * in this filesystem. */
- public static void format(File dir, Configuration conf)
- throws IOException {
- File image = new File(dir, "image");
- File edits = new File(dir, "edits");
-
- if (!((!image.exists() || FileUtil.fullyDelete(image)) &&
- (!edits.exists() || edits.delete()) &&
- image.mkdirs())) {
-
- throw new IOException("Unable to format: "+dir);
- }
+ * in this filesystem.
+ * @deprecated use @link FSImage#format(File, Configuration) instead */
+ public static void format(File dir, Configuration conf) throws IOException {
+ FSImage.format( dir, conf );
}
/**
- * Generate new namespaceID.
- *
- * namespaceID is a persistent attribute of the namespace.
- * It is generated when the namenode is formatted and remains the same
- * during the life cycle of the namenode.
- * When a datanodes register they receive it as the registrationID,
- * which is checked every time the datanode is communicating with the
- * namenode. Datanodes that do not 'know' the namespaceID are rejected.
- *
- * @return new namespaceID
- */
- private int newNamespaceID() {
- Random r = new Random();
- r.setSeed( System.currentTimeMillis() );
- int newID = 0;
- while( newID == 0)
- newID = r.nextInt();
- return newID;
- }
-
- /**
* Shutdown the filestore
*/
public void close() throws IOException {
- editlog.close();
+ fsImage.getEditLog().close();
}
/**
@@ -385,283 +347,6 @@
}
/**
- * Load in the filesystem image. It's a big list of
- * filenames and blocks. Return whether we should
- * "re-save" and consolidate the edit-logs
- */
- boolean loadFSImage( File fsdir,
- File edits,
- Configuration conf
- ) throws IOException {
- //
- // Atomic move sequence, to recover from interrupted save
- //
- File curFile = new File(fsdir, FS_IMAGE);
- File newFile = new File(fsdir, NEW_FS_IMAGE);
- File oldFile = new File(fsdir, OLD_FS_IMAGE);
-
- // Maybe we were interrupted between 2 and 4
- if (oldFile.exists() && curFile.exists()) {
- oldFile.delete();
- if (edits.exists()) {
- edits.delete();
- }
- } else if (oldFile.exists() && newFile.exists()) {
- // Or maybe between 1 and 2
- newFile.renameTo(curFile);
- oldFile.delete();
- } else if (curFile.exists() && newFile.exists()) {
- // Or else before stage 1, in which case we lose the edits
- newFile.delete();
- }
-
- //
- // Load in bits
- //
- boolean needToSave = true;
- int imgVersion = DFS_CURRENT_VERSION;
- if (curFile.exists()) {
- DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
- try {
- // read image version: first appeared in version -1
- imgVersion = in.readInt();
- // read namespaceID: first appeared in version -2
- if( imgVersion <= -2 )
- namespaceID = in.readInt();
- // read number of files
- int numFiles = 0;
- // version 0 does not store version #
- // starts directly with the number of files
- if( imgVersion >= 0 ) {
- numFiles = imgVersion;
- imgVersion = 0;
- } else
- numFiles = in.readInt();
-
- needToSave = ( imgVersion != DFS_CURRENT_VERSION );
- if( imgVersion < DFS_CURRENT_VERSION ) // future version
- throw new IOException(
- "Unsupported version of the file system image: "
- + imgVersion
- + ". Current version = "
- + DFS_CURRENT_VERSION + "." );
-
- // read file info
- short replication = (short)conf.getInt("dfs.replication", 3);
- for (int i = 0; i < numFiles; i++) {
- UTF8 name = new UTF8();
- name.readFields(in);
- // version 0 does not support per file replication
- if( !(imgVersion >= 0) ) {
- replication = in.readShort(); // other versions do
- replication = adjustReplication( replication, conf );
- }
- int numBlocks = in.readInt();
- Block blocks[] = null;
- if (numBlocks > 0) {
- blocks = new Block[numBlocks];
- for (int j = 0; j < numBlocks; j++) {
- blocks[j] = new Block();
- blocks[j].readFields(in);
- }
- }
- unprotectedAddFile(name,
- new INode( name.toString(), blocks, replication ));
- }
- } finally {
- in.close();
- }
- }
-
- if( namespaceID == 0 )
- namespaceID = newNamespaceID();
-
- return needToSave || ( edits.exists() && loadFSEdits(edits, conf) > 0 );
- }
-
- /**
- * Load an edit log, and apply the changes to the in-memory structure
- *
- * This is where we apply edits that we've been writing to disk all
- * along.
- */
- int loadFSEdits(File edits, Configuration conf) throws IOException {
- int numEdits = 0;
- int logVersion = 0;
-
- if (edits.exists()) {
- DataInputStream in = new DataInputStream(
- new BufferedInputStream(
- new FileInputStream(edits)));
- // Read log file version. Could be missing.
- in.mark( 4 );
- if( in.available() > 0 ) {
- logVersion = in.readByte();
- in.reset();
- if( logVersion >= 0 )
- logVersion = 0;
- else
- logVersion = in.readInt();
- if( logVersion < DFS_CURRENT_VERSION ) // future version
- throw new IOException(
- "Unexpected version of the file system log file: "
- + logVersion
- + ". Current version = "
- + DFS_CURRENT_VERSION + "." );
- }
-
- short replication = (short)conf.getInt("dfs.replication", 3);
- try {
- while (in.available() > 0) {
- byte opcode = in.readByte();
- numEdits++;
- switch (opcode) {
- case OP_ADD: {
- UTF8 name = new UTF8();
- ArrayWritable aw = null;
- Writable writables[];
- // version 0 does not support per file replication
- if( logVersion >= 0 )
- name.readFields(in); // read name only
- else { // other versions do
- // get name and replication
- aw = new ArrayWritable(UTF8.class);
- aw.readFields(in);
- writables = aw.get();
- if( writables.length != 2 )
- throw new IOException("Incorrect data fortmat. "
- + "Name & replication pair expected");
- name = (UTF8) writables[0];
- replication = Short.parseShort(
- ((UTF8)writables[1]).toString());
- replication = adjustReplication( replication, conf );
- }
- // get blocks
- aw = new ArrayWritable(Block.class);
- aw.readFields(in);
- writables = aw.get();
- Block blocks[] = new Block[writables.length];
- System.arraycopy(writables, 0, blocks, 0, blocks.length);
- // add to the file tree
- unprotectedAddFile(name,
- new INode( name.toString(), blocks, replication ));
- break;
- }
- case OP_SET_REPLICATION: {
- UTF8 src = new UTF8();
- UTF8 repl = new UTF8();
- src.readFields(in);
- repl.readFields(in);
- replication=adjustReplication(
- fromLogReplication(repl),
- conf);
- unprotectedSetReplication(src.toString(),
- replication,
- null);
- break;
- }
- case OP_RENAME: {
- UTF8 src = new UTF8();
- UTF8 dst = new UTF8();
- src.readFields(in);
- dst.readFields(in);
- unprotectedRenameTo(src, dst);
- break;
- }
- case OP_DELETE: {
- UTF8 src = new UTF8();
- src.readFields(in);
- unprotectedDelete(src);
- break;
- }
- case OP_MKDIR: {
- UTF8 src = new UTF8();
- src.readFields(in);
- unprotectedMkdir(src.toString());
- break;
- }
- default: {
- throw new IOException("Never seen opcode " + opcode);
- }
- }
- }
- } finally {
- in.close();
- }
- }
-
- if( logVersion != DFS_CURRENT_VERSION ) // other version
- numEdits++; // save this image asap
- return numEdits;
- }
-
- private static short adjustReplication( short replication, Configuration conf) {
- short minReplication = (short)conf.getInt("dfs.replication.min", 1);
- if( replication<minReplication ) {
- replication = minReplication;
- }
- short maxReplication = (short)conf.getInt("dfs.replication.max", 512);
- if( replication>maxReplication ) {
- replication = maxReplication;
- }
- return replication;
- }
- /**
- * Save the contents of the FS image
- */
- void saveFSImage(File fullimage, File edits) throws IOException {
- File curFile = new File(fullimage, FS_IMAGE);
- File newFile = new File(fullimage, NEW_FS_IMAGE);
- File oldFile = new File(fullimage, OLD_FS_IMAGE);
-
- //
- // Write out data
- //
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
- try {
- out.writeInt(DFS_CURRENT_VERSION);
- out.writeInt(this.namespaceID);
- out.writeInt(rootDir.numItemsInTree() - 1);
- rootDir.saveImage("", out);
- } finally {
- out.close();
- }
-
- //
- // Atomic move sequence
- //
- // 1. Move cur to old
- curFile.renameTo(oldFile);
-
- // 2. Move new to cur
- newFile.renameTo(curFile);
-
- // 3. Remove pending-edits file (it's been integrated with newFile)
- edits.delete();
-
- // 4. Delete old
- oldFile.delete();
- }
-
- /**
- * Write an operation to the edit log
- */
- void logEdit(byte op, Writable w1, Writable w2) {
- synchronized (editlog) {
- try {
- editlog.write(op);
- if (w1 != null) {
- w1.write(editlog);
- }
- if (w2 != null) {
- w2.write(editlog);
- }
- } catch (IOException ie) {
- }
- }
- }
-
- /**
* Add the given filename to the fs.
*/
public boolean addFile(UTF8 path, Block[] blocks, short replication) {
@@ -677,26 +362,13 @@
+blocks.length+" blocks to the file system" );
return false;
}
- // add createRecord file record to log
- UTF8 nameReplicationPair[] = new UTF8[] {
- path,
- toLogReplication( replication )};
- logEdit(OP_ADD,
- new ArrayWritable( UTF8.class, nameReplicationPair ),
- new ArrayWritable( Block.class, newNode.blocks ));
+ // add create file record to log
+ fsImage.getEditLog().logCreateFile( newNode );
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+path+" with "+blocks.length+" blocks is added to the file system" );
return true;
}
- private static UTF8 toLogReplication( short replication ) {
- return new UTF8( Short.toString(replication));
- }
-
- private static short fromLogReplication( UTF8 replication ) {
- return Short.parseShort(replication.toString());
- }
-
/**
*/
boolean unprotectedAddFile(UTF8 path, INode newNode) {
@@ -716,20 +388,23 @@
}
}
}
+
+ boolean unprotectedAddFile(UTF8 path, Block[] blocks, short replication ) {
+ return unprotectedAddFile( path,
+ new INode( path.toString(), blocks, replication ));
+ }
/**
* Change the filename
*/
public boolean renameTo(UTF8 src, UTF8 dst) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
- +src+" to "+dst );
- waitForReady();
- if (unprotectedRenameTo(src, dst)) {
- logEdit(OP_RENAME, src, dst);
- return true;
- } else {
- return false;
- }
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
+ +src+" to "+dst );
+ waitForReady();
+ if( ! unprotectedRenameTo(src, dst) )
+ return false;
+ fsImage.getEditLog().logRename(src, dst);
+ return true;
}
/**
@@ -783,23 +458,21 @@
* @return array of file blocks
* @throws IOException
*/
- public Block[] setReplication(String src,
- short replication,
- Vector oldReplication
- ) throws IOException {
+ Block[] setReplication( String src,
+ short replication,
+ Vector oldReplication
+ ) throws IOException {
waitForReady();
Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication );
- if( fileBlocks != null ) //
- logEdit(OP_SET_REPLICATION,
- new UTF8(src),
- toLogReplication( replication ));
+ if( fileBlocks != null ) // log replication change
+ fsImage.getEditLog().logSetReplication( src, replication );
return fileBlocks;
}
- private Block[] unprotectedSetReplication( String src,
- short replication,
- Vector oldReplication
- ) throws IOException {
+ Block[] unprotectedSetReplication( String src,
+ short replication,
+ Vector oldReplication
+ ) throws IOException {
if( oldReplication == null )
oldReplication = new Vector();
oldReplication.setSize(1);
@@ -847,7 +520,7 @@
waitForReady();
Block[] blocks = unprotectedDelete(src);
if( blocks != null )
- logEdit(OP_DELETE, src, null);
+ fsImage.getEditLog().logDelete( src );
return blocks;
}
@@ -984,7 +657,7 @@
}
/**
- * Create the given directory and all its parent dirs.
+ * @deprecated use @link #mkdirs(String) instead
*/
public boolean mkdirs(UTF8 src) {
return mkdirs(src.toString());
@@ -1019,7 +692,7 @@
if (inserted != null) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
+"created directory "+cur );
- logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
+ fsImage.getEditLog().logMkDir( inserted );
} // otherwise cur exists, continue
} catch (FileNotFoundException e ) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=425672&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Wed Jul 26 01:25:49 2006
@@ -0,0 +1,270 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * FSEditLog maintains a log of the namespace modifications.
+ *
+ * @author Konstantin Shvachko
+ */
+class FSEditLog {
+ private static final byte OP_ADD = 0;
+ private static final byte OP_RENAME = 1;
+ private static final byte OP_DELETE = 2;
+ private static final byte OP_MKDIR = 3;
+ private static final byte OP_SET_REPLICATION = 4;
+
+ private File editsFile;
+ DataOutputStream editsStream = null;
+
+ FSEditLog( File edits ) {
+ this.editsFile = edits;
+ }
+
+ File getEditsFile() {
+ return this.editsFile;
+ }
+
+ /**
+ * Initialize the output stream for logging.
+ *
+ * @throws IOException
+ */
+ void create() throws IOException {
+ editsStream = new DataOutputStream(new FileOutputStream(editsFile));
+ editsStream.writeInt( FSConstants.DFS_CURRENT_VERSION );
+ }
+
+ /**
+ * Shutdown the filestore
+ */
+ void close() throws IOException {
+ editsStream.close();
+ }
+
+ /**
+ * Load an edit log, and apply the changes to the in-memory structure
+ *
+ * This is where we apply edits that we've been writing to disk all
+ * along.
+ */
+ int loadFSEdits( FSDirectory fsDir,
+ Configuration conf
+ ) throws IOException {
+ int numEdits = 0;
+ int logVersion = 0;
+
+ if (editsFile.exists()) {
+ DataInputStream in = new DataInputStream(
+ new BufferedInputStream(
+ new FileInputStream(editsFile)));
+ // Read log file version. Could be missing.
+ in.mark( 4 );
+ if( in.available() > 0 ) {
+ logVersion = in.readByte();
+ in.reset();
+ if( logVersion >= 0 )
+ logVersion = 0;
+ else
+ logVersion = in.readInt();
+ if( logVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
+ throw new IOException(
+ "Unexpected version of the file system log file: "
+ + logVersion
+ + ". Current version = "
+ + FSConstants.DFS_CURRENT_VERSION + "." );
+ }
+
+ short replication = (short)conf.getInt("dfs.replication", 3);
+ try {
+ while (in.available() > 0) {
+ byte opcode = in.readByte();
+ numEdits++;
+ switch (opcode) {
+ case OP_ADD: {
+ UTF8 name = new UTF8();
+ ArrayWritable aw = null;
+ Writable writables[];
+ // version 0 does not support per file replication
+ if( logVersion >= 0 )
+ name.readFields(in); // read name only
+ else { // other versions do
+ // get name and replication
+ aw = new ArrayWritable(UTF8.class);
+ aw.readFields(in);
+ writables = aw.get();
+ if( writables.length != 2 )
+ throw new IOException("Incorrect data fortmat. "
+ + "Name & replication pair expected");
+ name = (UTF8) writables[0];
+ replication = Short.parseShort(
+ ((UTF8)writables[1]).toString());
+ replication = adjustReplication( replication, conf );
+ }
+ // get blocks
+ aw = new ArrayWritable(Block.class);
+ aw.readFields(in);
+ writables = aw.get();
+ Block blocks[] = new Block[writables.length];
+ System.arraycopy(writables, 0, blocks, 0, blocks.length);
+ // add to the file tree
+ fsDir.unprotectedAddFile(name, blocks, replication );
+ break;
+ }
+ case OP_SET_REPLICATION: {
+ UTF8 src = new UTF8();
+ UTF8 repl = new UTF8();
+ src.readFields(in);
+ repl.readFields(in);
+ replication = adjustReplication(
+ fromLogReplication(repl),
+ conf);
+ fsDir.unprotectedSetReplication(src.toString(),
+ replication,
+ null);
+ break;
+ }
+ case OP_RENAME: {
+ UTF8 src = new UTF8();
+ UTF8 dst = new UTF8();
+ src.readFields(in);
+ dst.readFields(in);
+ fsDir.unprotectedRenameTo(src, dst);
+ break;
+ }
+ case OP_DELETE: {
+ UTF8 src = new UTF8();
+ src.readFields(in);
+ fsDir.unprotectedDelete(src);
+ break;
+ }
+ case OP_MKDIR: {
+ UTF8 src = new UTF8();
+ src.readFields(in);
+ fsDir.unprotectedMkdir(src.toString());
+ break;
+ }
+ default: {
+ throw new IOException("Never seen opcode " + opcode);
+ }
+ }
+ }
+ } finally {
+ in.close();
+ }
+ }
+
+ if( logVersion != FSConstants.DFS_CURRENT_VERSION ) // other version
+ numEdits++; // save this image asap
+ return numEdits;
+ }
+
+ static short adjustReplication( short replication, Configuration conf) {
+ short minReplication = (short)conf.getInt("dfs.replication.min", 1);
+ if( replication<minReplication ) {
+ replication = minReplication;
+ }
+ short maxReplication = (short)conf.getInt("dfs.replication.max", 512);
+ if( replication>maxReplication ) {
+ replication = maxReplication;
+ }
+ return replication;
+ }
+
+ /**
+ * Write an operation to the edit log
+ */
+ void logEdit(byte op, Writable w1, Writable w2) {
+ synchronized (editsStream) {
+ try {
+ editsStream.write(op);
+ if (w1 != null) {
+ w1.write(editsStream);
+ }
+ if (w2 != null) {
+ w2.write(editsStream);
+ }
+ } catch (IOException ie) {
+ // TODO: Must report an error here
+ }
+ }
+ // TODO: initialize checkpointing if the log is large enough
+ }
+
+ /**
+ * Add create file record to edit log
+ */
+ void logCreateFile( FSDirectory.INode newNode ) {
+ UTF8 nameReplicationPair[] = new UTF8[] {
+ new UTF8( newNode.computeName() ),
+ FSEditLog.toLogReplication( newNode.getReplication() )};
+ logEdit(OP_ADD,
+ new ArrayWritable( UTF8.class, nameReplicationPair ),
+ new ArrayWritable( Block.class, newNode.getBlocks() ));
+ }
+
+ /**
+ * Add create directory record to edit log
+ */
+ void logMkDir( FSDirectory.INode newNode ) {
+ logEdit(OP_MKDIR, new UTF8( newNode.computeName() ), null );
+ }
+
+ /**
+ * Add rename record to edit log
+ * TODO: use String parameters until just before writing to disk
+ */
+ void logRename( UTF8 src, UTF8 dst ) {
+ logEdit(OP_RENAME, src, dst);
+ }
+
+ /**
+ * Add set replication record to edit log
+ */
+ void logSetReplication( String src, short replication ) {
+ logEdit(OP_SET_REPLICATION,
+ new UTF8(src),
+ FSEditLog.toLogReplication( replication ));
+ }
+
+ /**
+ * Add delete file record to edit log
+ */
+ void logDelete( UTF8 src ) {
+ logEdit(OP_DELETE, src, null);
+ }
+
+ static UTF8 toLogReplication( short replication ) {
+ return new UTF8( Short.toString(replication));
+ }
+
+ static short fromLogReplication( UTF8 replication ) {
+ return Short.parseShort(replication.toString());
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=425672&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Wed Jul 26 01:25:49 2006
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSDirectory.INode;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * FSImage handles checkpointing and logging of the namespace edits.
+ *
+ * @author Konstantin Shvachko
+ */
+class FSImage {
+ private static final String FS_IMAGE = "fsimage";
+ private static final String NEW_FS_IMAGE = "fsimage.new";
+ private static final String OLD_FS_IMAGE = "fsimage.old";
+
+ private File imageDir; /// directory that contains the image file
+ private FSEditLog editLog;
+ // private int namespaceID = 0; /// a persistent attribute of the namespace
+
+ /**
+ *
+ */
+ FSImage( File fsDir, Configuration conf ) throws IOException {
+ this.imageDir = new File(fsDir, "image");
+ if (! imageDir.exists()) {
+ throw new IOException("NameNode not formatted: " + fsDir);
+ }
+ File edits = new File(fsDir, "edits");
+ this.editLog = new FSEditLog( edits );
+ }
+
+ FSEditLog getEditLog() {
+ return editLog;
+ }
+
+ /**
+ * Load in the filesystem image. It's a big list of
+ * filenames and blocks. Return whether we should
+ * "re-save" and consolidate the edit-logs
+ */
+ void loadFSImage( FSDirectory fsDir,
+ Configuration conf
+ ) throws IOException {
+ File edits = editLog.getEditsFile();
+ //
+ // Atomic move sequence, to recover from interrupted save
+ //
+ File curFile = new File(imageDir, FS_IMAGE);
+ File newFile = new File(imageDir, NEW_FS_IMAGE);
+ File oldFile = new File(imageDir, OLD_FS_IMAGE);
+
+ // Maybe we were interrupted between 2 and 4
+ if (oldFile.exists() && curFile.exists()) {
+ oldFile.delete();
+ if (edits.exists()) {
+ edits.delete();
+ }
+ } else if (oldFile.exists() && newFile.exists()) {
+ // Or maybe between 1 and 2
+ newFile.renameTo(curFile);
+ oldFile.delete();
+ } else if (curFile.exists() && newFile.exists()) {
+ // Or else before stage 1, in which case we lose the edits
+ newFile.delete();
+ }
+
+ //
+ // Load in bits
+ //
+ boolean needToSave = true;
+ int imgVersion = FSConstants.DFS_CURRENT_VERSION;
+ if (curFile.exists()) {
+ DataInputStream in = new DataInputStream(
+ new BufferedInputStream(
+ new FileInputStream(curFile)));
+ try {
+ // read image version: first appeared in version -1
+ imgVersion = in.readInt();
+ // read namespaceID: first appeared in version -2
+ if( imgVersion <= -2 )
+ fsDir.namespaceID = in.readInt();
+ // read number of files
+ int numFiles = 0;
+ // version 0 does not store version #
+ // starts directly with the number of files
+ if( imgVersion >= 0 ) {
+ numFiles = imgVersion;
+ imgVersion = 0;
+ } else
+ numFiles = in.readInt();
+
+ needToSave = ( imgVersion != FSConstants.DFS_CURRENT_VERSION );
+ if( imgVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
+ throw new IOException(
+ "Unsupported version of the file system image: "
+ + imgVersion
+ + ". Current version = "
+ + FSConstants.DFS_CURRENT_VERSION + "." );
+
+ // read file info
+ short replication = (short)conf.getInt("dfs.replication", 3);
+ for (int i = 0; i < numFiles; i++) {
+ UTF8 name = new UTF8();
+ name.readFields(in);
+ // version 0 does not support per file replication
+ if( !(imgVersion >= 0) ) {
+ replication = in.readShort(); // other versions do
+ replication = FSEditLog.adjustReplication( replication, conf );
+ }
+ int numBlocks = in.readInt();
+ Block blocks[] = null;
+ if (numBlocks > 0) {
+ blocks = new Block[numBlocks];
+ for (int j = 0; j < numBlocks; j++) {
+ blocks[j] = new Block();
+ blocks[j].readFields(in);
+ }
+ }
+ fsDir.unprotectedAddFile(name, blocks, replication );
+ }
+ } finally {
+ in.close();
+ }
+ }
+
+ if( fsDir.namespaceID == 0 )
+ fsDir.namespaceID = newNamespaceID();
+
+ needToSave |= ( edits.exists() && editLog.loadFSEdits(fsDir, conf) > 0 );
+ if( needToSave )
+ saveFSImage( fsDir );
+ }
+
+ /**
+ * Save the contents of the FS image
+ */
+ void saveFSImage( FSDirectory fsDir ) throws IOException {
+ File curFile = new File(imageDir, FS_IMAGE);
+ File newFile = new File(imageDir, NEW_FS_IMAGE);
+ File oldFile = new File(imageDir, OLD_FS_IMAGE);
+
+ //
+ // Write out data
+ //
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
+ try {
+ out.writeInt(FSConstants.DFS_CURRENT_VERSION);
+ out.writeInt(fsDir.namespaceID);
+ out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
+ saveImage( "", fsDir.rootDir, out );
+ } finally {
+ out.close();
+ }
+
+ //
+ // Atomic move sequence
+ //
+ // 1. Move cur to old
+ curFile.renameTo(oldFile);
+ // 2. Move new to cur
+ newFile.renameTo(curFile);
+ // 3. Remove pending-edits file (it's been integrated with newFile)
+ editLog.getEditsFile().delete();
+ // 4. Delete old
+ oldFile.delete();
+ }
+
+ /**
+ * Generate new namespaceID.
+ *
+ * namespaceID is a persistent attribute of the namespace.
+ * It is generated when the namenode is formatted and remains the same
+ * during the life cycle of the namenode.
+ * When a datanodes register they receive it as the registrationID,
+ * which is checked every time the datanode is communicating with the
+ * namenode. Datanodes that do not 'know' the namespaceID are rejected.
+ *
+ * @return new namespaceID
+ */
+ private int newNamespaceID() {
+ Random r = new Random();
+ r.setSeed( System.currentTimeMillis() );
+ int newID = 0;
+ while( newID == 0)
+ newID = r.nextInt();
+ return newID;
+ }
+
+ /** Create a new dfs name directory. Caution: this destroys all files
+ * in this filesystem. */
+ static void format(File dir, Configuration conf) throws IOException {
+ File image = new File(dir, "image");
+ File edits = new File(dir, "edits");
+
+ if (!((!image.exists() || FileUtil.fullyDelete(image)) &&
+ (!edits.exists() || edits.delete()) &&
+ image.mkdirs())) {
+ throw new IOException("Unable to format: "+dir);
+ }
+ }
+
+ /**
+ * Save file tree image starting from the given root.
+ */
+ void saveImage( String parentPrefix,
+ FSDirectory.INode root,
+ DataOutputStream out ) throws IOException {
+ String fullName = "";
+ if( root.getParent() != null) {
+ fullName = parentPrefix + "/" + root.getLocalName();
+ new UTF8(fullName).write(out);
+ out.writeShort( root.getReplication() );
+ if( root.isDir() ) {
+ out.writeInt(0);
+ } else {
+ int nrBlocks = root.getBlocks().length;
+ out.writeInt( nrBlocks );
+ for (int i = 0; i < nrBlocks; i++)
+ root.getBlocks()[i].write(out);
+ }
+ }
+ for(Iterator it = root.getChildren().values().iterator(); it.hasNext(); ) {
+ INode child = (INode) it.next();
+ saveImage( fullName, child, out );
+ }
+ }
+}