You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2010/10/29 19:04:08 UTC
svn commit: r1028847 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: eli
Date: Fri Oct 29 17:04:07 2010
New Revision: 1028847
URL: http://svn.apache.org/viewvc?rev=1028847&view=rev
Log:
HDFS-1462. Refactor edit log loading to a separate class from edit log writing. Contributed by Todd Lipcon.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Oct 29 17:04:07 2010
@@ -157,6 +157,9 @@ Trunk (unreleased changes)
HDFS-259. Remove intentionally corrupt 0.13 directory layout creation.
(Todd Lipcon via eli)
+ HDFS-1462. Refactor edit log loading to a separate class from edit log writing.
+ (Todd Lipcon via eli)
+
OPTIMIZATIONS
HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Fri Oct 29 17:04:07 2010
@@ -43,6 +43,7 @@ public class BackupStorage extends FSIma
/** Backup input stream for loading edits into memory */
private EditLogBackupInputStream backupInputStream;
+
/** Is journal spooling in progress */
volatile JSpoolState jsState;
@@ -214,7 +215,8 @@ public class BackupStorage extends FSIma
waitSpoolEnd();
// update NameSpace in memory
backupInputStream.setBytes(data);
- editLog.loadEditRecords(getLayoutVersion(),
+ FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ logLoader.loadEditRecords(getLayoutVersion(),
backupInputStream.getDataInputStream(), true);
getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
break;
@@ -334,11 +336,12 @@ public class BackupStorage extends FSIma
// load edits.new
EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
DataInputStream in = edits.getDataInputStream();
- numEdits += editLog.loadFSEdits(in, false);
-
+ FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ numEdits += logLoader.loadFSEdits(in, false);
+
// first time reached the end of spool
jsState = JSpoolState.WAIT;
- numEdits += editLog.loadEditRecords(getLayoutVersion(), in, true);
+ numEdits += logLoader.loadEditRecords(getLayoutVersion(), in, true);
getFSNamesystem().dir.updateCountForINodeWithQuota();
edits.close();
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Oct 29 17:04:07 2010
@@ -147,7 +147,7 @@ class EditLogBackupOutputStream extends
JournalRecord jRec = null;
for(; idx < bufReadySize; idx++) {
jRec = bufReady.get(idx);
- if(jRec.op >= FSEditLog.OP_JSPOOL_START)
+ if(jRec.op >= FSEditLog.Ops.OP_JSPOOL_START)
break; // special operation should be sent in a separate call to BN
jRec.write(out);
}
@@ -177,7 +177,7 @@ class EditLogBackupOutputStream extends
private void send(int ja) throws IOException {
try {
int length = out.getLength();
- out.write(FSEditLog.OP_INVALID);
+ out.write(FSEditLog.Ops.OP_INVALID);
backupNode.journal(nnRegistration, ja, length, out.getData());
} finally {
out.reset();
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Oct 29 17:04:07 2010
@@ -128,7 +128,7 @@ class EditLogFileOutputStream extends Ed
@Override
void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet";
- write(FSEditLog.OP_INVALID); // insert end-of-file marker
+ write(FSEditLog.Ops.OP_INVALID); // insert end-of-file marker
DataOutputBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
@@ -189,7 +189,7 @@ class EditLogFileOutputStream extends Ed
*/
@Override
boolean isOperationSupported(byte op) {
- return op < FSEditLog.OP_JSPOOL_START - 1;
+ return op < FSEditLog.Ops.OP_JSPOOL_START - 1;
}
/**
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 29 17:04:07 2010
@@ -17,28 +17,20 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -53,8 +45,6 @@ import org.apache.hadoop.io.ArrayWritabl
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.security.token.delegation.DelegationKey;
/**
@@ -64,45 +54,50 @@ import org.apache.hadoop.security.token.
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLog {
- public static final byte OP_INVALID = -1;
- private static final byte OP_ADD = 0;
- private static final byte OP_RENAME_OLD = 1; // rename
- private static final byte OP_DELETE = 2; // delete
- private static final byte OP_MKDIR = 3; // create directory
- private static final byte OP_SET_REPLICATION = 4; // set replication
- //the following two are used only for backward compatibility :
- @Deprecated private static final byte OP_DATANODE_ADD = 5;
- @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
- private static final byte OP_SET_PERMISSIONS = 7;
- private static final byte OP_SET_OWNER = 8;
- private static final byte OP_CLOSE = 9; // close after write
- private static final byte OP_SET_GENSTAMP = 10; // store genstamp
- /* The following two are not used any more. Should be removed once
- * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
- private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
- private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
- private static final byte OP_TIMES = 13; // sets mod & access time on a file
- private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
- private static final byte OP_RENAME = 15; // new rename
- private static final byte OP_CONCAT_DELETE = 16; // concat files.
- private static final byte OP_SYMLINK = 17; // a symbolic link
- private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
- private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
- private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
- private static final byte OP_UPDATE_MASTER_KEY = 21; //update master key
-
- /*
- * The following operations are used to control remote edit log streams,
- * and not logged into file streams.
- */
- static final byte OP_JSPOOL_START = // start journal spool
- NamenodeProtocol.JA_JSPOOL_START;
- static final byte OP_CHECKPOINT_TIME = // incr checkpoint time
- NamenodeProtocol.JA_CHECKPOINT_TIME;
+
+ abstract static class Ops {
+ public static final byte OP_INVALID = -1;
+ public static final byte OP_ADD = 0;
+ public static final byte OP_RENAME_OLD = 1; // rename
+ public static final byte OP_DELETE = 2; // delete
+ public static final byte OP_MKDIR = 3; // create directory
+ public static final byte OP_SET_REPLICATION = 4; // set replication
+ //the following two are used only for backward compatibility :
+ @Deprecated public static final byte OP_DATANODE_ADD = 5;
+ @Deprecated public static final byte OP_DATANODE_REMOVE = 6;
+ public static final byte OP_SET_PERMISSIONS = 7;
+ public static final byte OP_SET_OWNER = 8;
+ public static final byte OP_CLOSE = 9; // close after write
+ public static final byte OP_SET_GENSTAMP = 10; // store genstamp
+ /* The following two are not used any more. Should be removed once
+ * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
+ public static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
+ public static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
+ public static final byte OP_TIMES = 13; // sets mod & access time on a file
+ public static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+ public static final byte OP_RENAME = 15; // new rename
+ public static final byte OP_CONCAT_DELETE = 16; // concat files.
+ public static final byte OP_SYMLINK = 17; // a symbolic link
+ public static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
+ public static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
+ public static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
+ public static final byte OP_UPDATE_MASTER_KEY = 21; //update master key
+
+ /*
+ * The following operations are used to control remote edit log streams,
+ * and not logged into file streams.
+ */
+ static final byte OP_JSPOOL_START = // start journal spool
+ NamenodeProtocol.JA_JSPOOL_START;
+ static final byte OP_CHECKPOINT_TIME = // incr checkpoint time
+ NamenodeProtocol.JA_CHECKPOINT_TIME;
+ }
static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
" File system changes are not persistent. No journal streams.";
+ private static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
private volatile int sizeOutputFlushBuffer = 512*1024;
private ArrayList<EditLogOutputStream> editStreams = null;
@@ -198,7 +193,7 @@ public class FSEditLog {
try {
addNewEditLogStream(eFile);
} catch (IOException e) {
- FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
+ LOG.warn("Unable to open edit log file " + eFile);
// Remove the directory from list of storage directories
if(al == null) al = new ArrayList<StorageDirectory>(1);
al.add(sd);
@@ -243,7 +238,7 @@ public class FSEditLog {
try {
closeStream(eStream);
} catch (IOException e) {
- FSNamesystem.LOG.warn("FSEditLog:close - failed to close stream "
+ LOG.warn("FSEditLog:close - failed to close stream "
+ eStream.getName());
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
@@ -281,17 +276,17 @@ public class FSEditLog {
}
String lsd = fsimage.listStorageDirectories();
- FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
+ LOG.info("current list of storage dirs:" + lsd);
ArrayList<StorageDirectory> al = null;
for (EditLogOutputStream eStream : errorStreams) {
- FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName()
+ LOG.error("Unable to log edits to " + eStream.getName()
+ "; removing it");
StorageDirectory storageDir;
if(propagate && eStream.getType() == JournalType.FILE && //find SD
(storageDir = getStorage(eStream)) != null) {
- FSNamesystem.LOG.info("about to remove corresponding storage:"
+ LOG.info("about to remove corresponding storage:"
+ storageDir.getRoot().getAbsolutePath());
// remove corresponding storage dir
if(al == null) al = new ArrayList<StorageDirectory>(1);
@@ -303,7 +298,7 @@ public class FSEditLog {
if (es == eStream) {
try { eStream.close(); } catch (IOException e) {
// nothing to do.
- FSNamesystem.LOG.warn("Failed to close eStream " + eStream.getName()
+ LOG.warn("Failed to close eStream " + eStream.getName()
+ " before removing it (might be ok)");
}
ies.remove();
@@ -314,7 +309,7 @@ public class FSEditLog {
if (editStreams == null || editStreams.size() <= 0) {
String msg = "Fatal Error: All storage directories are inaccessible.";
- FSNamesystem.LOG.fatal(msg, new IOException(msg));
+ LOG.fatal(msg, new IOException(msg));
Runtime.getRuntime().exit(-1);
}
@@ -325,7 +320,7 @@ public class FSEditLog {
if(propagate) incrementCheckpointTime();
lsd = fsimage.listStorageDirectories();
- FSNamesystem.LOG.info("at the end current list of storage dirs:" + lsd);
+ LOG.info("at the end current list of storage dirs:" + lsd);
}
@@ -341,7 +336,7 @@ public class FSEditLog {
Iterator<StorageDirectory> it = fsimage.dirIterator();
while (it.hasNext()) {
StorageDirectory sd = it.next();
- FSNamesystem.LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());
+ LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());
if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
return sd;
}
@@ -371,491 +366,8 @@ public class FSEditLog {
}
/**
- * Load an edit log, and apply the changes to the in-memory structure
- * This is where we apply edits that we've been writing to disk all
- * along.
- */
- int loadFSEdits(EditLogInputStream edits) throws IOException {
- DataInputStream in = edits.getDataInputStream();
- long startTime = now();
- int numEdits = loadFSEdits(in, true);
- FSImage.LOG.info("Edits file " + edits.getName()
- + " of size " + edits.length() + " edits # " + numEdits
- + " loaded in " + (now()-startTime)/1000 + " seconds.");
- return numEdits;
- }
-
- int loadFSEdits(DataInputStream in, boolean closeOnExit) throws IOException {
- int numEdits = 0;
- int logVersion = 0;
-
- try {
- // Read log file version. Could be missing.
- in.mark(4);
- // If edits log is greater than 2G, available method will return negative
- // numbers, so we avoid having to call available
- boolean available = true;
- try {
- logVersion = in.readByte();
- } catch (EOFException e) {
- available = false;
- }
- if (available) {
- in.reset();
- logVersion = in.readInt();
- if (logVersion < FSConstants.LAYOUT_VERSION) // future version
- throw new IOException(
- "Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
- + FSConstants.LAYOUT_VERSION + ".");
- }
- assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
- "Unsupported version " + logVersion;
- numEdits = loadEditRecords(logVersion, in, false);
- } finally {
- if(closeOnExit)
- in.close();
- }
- if (logVersion != FSConstants.LAYOUT_VERSION) // other version
- numEdits++; // save this image asap
- return numEdits;
- }
-
- @SuppressWarnings("deprecation")
- int loadEditRecords(int logVersion, DataInputStream in,
- boolean closeOnExit) throws IOException {
- FSNamesystem fsNamesys = fsimage.getFSNamesystem();
- FSDirectory fsDir = fsNamesys.dir;
- int numEdits = 0;
- String clientName = null;
- String clientMachine = null;
- String path = null;
- int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
- numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
- numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
- numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0,
- numOpSymlink = 0, numOpGetDelegationToken = 0,
- numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
- numOpUpdateMasterKey = 0, numOpOther = 0;
-
- try {
- while (true) {
- long timestamp = 0;
- long mtime = 0;
- long atime = 0;
- long blockSize = 0;
- byte opcode = -1;
- try {
- in.mark(1);
- opcode = in.readByte();
- if (opcode == OP_INVALID) {
- in.reset(); // reset back to end of file if somebody reads it again
- break; // no more transactions
- }
- } catch (EOFException e) {
- break; // no more transactions
- }
- numEdits++;
- switch (opcode) {
- case OP_ADD:
- case OP_CLOSE: {
- // versions > 0 support per file replication
- // get name and replication
- int length = in.readInt();
- if (-7 == logVersion && length != 3||
- -17 < logVersion && logVersion < -7 && length != 4 ||
- logVersion <= -17 && length != 5) {
- throw new IOException("Incorrect data format." +
- " logVersion is " + logVersion +
- " but writables.length is " +
- length + ". ");
- }
- path = FSImage.readString(in);
- short replication = adjustReplication(readShort(in));
- mtime = readLong(in);
- if (logVersion <= -17) {
- atime = readLong(in);
- }
- if (logVersion < -7) {
- blockSize = readLong(in);
- }
- // get blocks
- boolean isFileUnderConstruction = (opcode == OP_ADD);
- BlockInfo blocks[] =
- readBlocks(in, logVersion, isFileUnderConstruction, replication);
-
- // Older versions of HDFS does not store the block size in inode.
- // If the file has more than one block, use the size of the
- // first block as the blocksize. Otherwise use the default
- // block size.
- if (-8 <= logVersion && blockSize == 0) {
- if (blocks.length > 1) {
- blockSize = blocks[0].getNumBytes();
- } else {
- long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
- blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
- }
- }
-
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
- }
-
- // clientname, clientMachine and block locations of last block.
- if (opcode == OP_ADD && logVersion <= -12) {
- clientName = FSImage.readString(in);
- clientMachine = FSImage.readString(in);
- if (-13 <= logVersion) {
- readDatanodeDescriptorArray(in);
- }
- } else {
- clientName = "";
- clientMachine = "";
- }
-
- // The open lease transaction re-creates a file if necessary.
- // Delete the file if it already exists.
- if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug(opcode + ": " + path +
- " numblocks : " + blocks.length +
- " clientHolder " + clientName +
- " clientMachine " + clientMachine);
- }
-
- fsDir.unprotectedDelete(path, mtime);
-
- // add to the file tree
- INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
- path, permissions,
- blocks, replication,
- mtime, atime, blockSize);
- if (isFileUnderConstruction) {
- numOpAdd++;
- //
- // Replace current node with a INodeUnderConstruction.
- // Recreate in-memory lease record.
- //
- INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
- node.getLocalNameBytes(),
- node.getReplication(),
- node.getModificationTime(),
- node.getPreferredBlockSize(),
- node.getBlocks(),
- node.getPermissionStatus(),
- clientName,
- clientMachine,
- null);
- fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.getClientName(), path);
- }
- break;
- }
- case OP_SET_REPLICATION: {
- numOpSetRepl++;
- path = FSImage.readString(in);
- short replication = adjustReplication(readShort(in));
- fsDir.unprotectedSetReplication(path, replication, null);
- break;
- }
- case OP_CONCAT_DELETE: {
- if (logVersion > -22) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- numOpConcatDelete++;
- int length = in.readInt();
- if (length < 3) { // trg, srcs.., timestam
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String trg = FSImage.readString(in);
- int srcSize = length - 1 - 1; //trg and timestamp
- String [] srcs = new String [srcSize];
- for(int i=0; i<srcSize;i++) {
- srcs[i]= FSImage.readString(in);
- }
- timestamp = readLong(in);
- fsDir.unprotectedConcat(trg, srcs);
- break;
- }
- case OP_RENAME_OLD: {
- numOpRenameOld++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImage.readString(in);
- String d = FSImage.readString(in);
- timestamp = readLong(in);
- HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
- fsDir.unprotectedRenameTo(s, d, timestamp);
- fsNamesys.changeLease(s, d, dinfo);
- break;
- }
- case OP_DELETE: {
- numOpDelete++;
- int length = in.readInt();
- if (length != 2) {
- throw new IOException("Incorrect data format. "
- + "delete operation.");
- }
- path = FSImage.readString(in);
- timestamp = readLong(in);
- fsDir.unprotectedDelete(path, timestamp);
- break;
- }
- case OP_MKDIR: {
- numOpMkDir++;
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- int length = in.readInt();
- if (-17 < logVersion && length != 2 ||
- logVersion <= -17 && length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- path = FSImage.readString(in);
- timestamp = readLong(in);
-
- // The disk format stores atimes for directories as well.
- // However, currently this is not being updated/used because of
- // performance reasons.
- if (logVersion <= -17) {
- atime = readLong(in);
- }
-
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
- }
- fsDir.unprotectedMkdir(path, permissions, timestamp);
- break;
- }
- case OP_SET_GENSTAMP: {
- numOpSetGenStamp++;
- long lw = in.readLong();
- fsNamesys.setGenerationStamp(lw);
- break;
- }
- case OP_DATANODE_ADD: {
- numOpOther++;
- FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
- nodeimage.readFields(in);
- //Datanodes are not persistent any more.
- break;
- }
- case OP_DATANODE_REMOVE: {
- numOpOther++;
- DatanodeID nodeID = new DatanodeID();
- nodeID.readFields(in);
- //Datanodes are not persistent any more.
- break;
- }
- case OP_SET_PERMISSIONS: {
- numOpSetPerm++;
- if (logVersion > -11)
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- fsDir.unprotectedSetPermission(
- FSImage.readString(in), FsPermission.read(in));
- break;
- }
- case OP_SET_OWNER: {
- numOpSetOwner++;
- if (logVersion > -11)
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- fsDir.unprotectedSetOwner(FSImage.readString(in),
- FSImage.readString_EmptyAsNull(in),
- FSImage.readString_EmptyAsNull(in));
- break;
- }
- case OP_SET_NS_QUOTA: {
- if (logVersion > -16) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- fsDir.unprotectedSetQuota(FSImage.readString(in),
- readLongWritable(in),
- FSConstants.QUOTA_DONT_SET);
- break;
- }
- case OP_CLEAR_NS_QUOTA: {
- if (logVersion > -16) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- fsDir.unprotectedSetQuota(FSImage.readString(in),
- FSConstants.QUOTA_RESET,
- FSConstants.QUOTA_DONT_SET);
- break;
- }
-
- case OP_SET_QUOTA:
- fsDir.unprotectedSetQuota(FSImage.readString(in),
- readLongWritable(in),
- readLongWritable(in));
-
- break;
-
- case OP_TIMES: {
- numOpTimes++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "times operation.");
- }
- path = FSImage.readString(in);
- mtime = readLong(in);
- atime = readLong(in);
- fsDir.unprotectedSetTimes(path, mtime, atime, true);
- break;
- }
- case OP_SYMLINK: {
- numOpSymlink++;
- int length = in.readInt();
- if (length != 4) {
- throw new IOException("Incorrect data format. "
- + "symlink operation.");
- }
- path = FSImage.readString(in);
- String value = FSImage.readString(in);
- mtime = readLong(in);
- atime = readLong(in);
- PermissionStatus perm = PermissionStatus.read(in);
- fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
- break;
- }
- case OP_RENAME: {
- if (logVersion > -21) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- numOpRename++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImage.readString(in);
- String d = FSImage.readString(in);
- timestamp = readLong(in);
- Rename[] options = readRenameOptions(in);
- HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
- fsDir.unprotectedRenameTo(s, d, timestamp, options);
- fsNamesys.changeLease(s, d, dinfo);
- break;
- }
- case OP_GET_DELEGATION_TOKEN: {
- if (logVersion > -24) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- numOpGetDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- long expiryTime = readLong(in);
- fsNamesys.getDelegationTokenSecretManager()
- .addPersistedDelegationToken(delegationTokenId, expiryTime);
- break;
- }
- case OP_RENEW_DELEGATION_TOKEN: {
- if (logVersion > -24) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- numOpRenewDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- long expiryTime = readLong(in);
- fsNamesys.getDelegationTokenSecretManager()
- .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
- break;
- }
- case OP_CANCEL_DELEGATION_TOKEN: {
- if (logVersion > -24) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- numOpCancelDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- fsNamesys.getDelegationTokenSecretManager()
- .updatePersistedTokenCancellation(delegationTokenId);
- break;
- }
- case OP_UPDATE_MASTER_KEY: {
- if (logVersion > -24) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- numOpUpdateMasterKey++;
- DelegationKey delegationKey = new DelegationKey();
- delegationKey.readFields(in);
- fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
- delegationKey);
- break;
- }
- default: {
- throw new IOException("Never seen opcode " + opcode);
- }
- }
- }
- } finally {
- if(closeOnExit)
- in.close();
- }
- if (FSImage.LOG.isDebugEnabled()) {
- FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose
- + " numOpDelete = " + numOpDelete
- + " numOpRenameOld = " + numOpRenameOld
- + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
- + " numOpSetPerm = " + numOpSetPerm
- + " numOpSetOwner = " + numOpSetOwner
- + " numOpSetGenStamp = " + numOpSetGenStamp
- + " numOpTimes = " + numOpTimes
- + " numOpConcatDelete = " + numOpConcatDelete
- + " numOpRename = " + numOpRename
- + " numOpGetDelegationToken = " + numOpGetDelegationToken
- + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
- + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
- + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
- + " numOpOther = " + numOpOther);
- }
- return numEdits;
- }
-
- // a place holder for reading a long
- private static final LongWritable longWritable = new LongWritable();
-
- /** Read an integer from an input stream */
- private static long readLongWritable(DataInputStream in) throws IOException {
- synchronized (longWritable) {
- longWritable.readFields(in);
- return longWritable.get();
- }
- }
-
- short adjustReplication(short replication) {
- FSNamesystem fsNamesys = fsimage.getFSNamesystem();
- short minReplication = fsNamesys.getMinReplication();
- if (replication<minReplication) {
- replication = minReplication;
- }
- short maxReplication = fsNamesys.getMaxReplication();
- if (replication>maxReplication) {
- replication = maxReplication;
- }
- return replication;
- }
-
- /**
- * Write an operation to the edit log.
- * Automatically sync buffered edits to persistent store if it is time
- * to sync.
+ * Write an operation to the edit log. Do not sync to persistent
+ * store yet.
*/
void logEdit(byte op, Writable ... writables) {
synchronized (this) {
@@ -867,16 +379,12 @@ public class FSEditLog {
ArrayList<EditLogOutputStream> errorStreams = null;
long start = now();
for(EditLogOutputStream eStream : editStreams) {
- if(FSImage.LOG.isDebugEnabled()) {
- FSImage.LOG.debug("loggin edits into " + eStream.getName() +
- " stream");
- }
if(!eStream.isOperationSupported(op))
continue;
try {
eStream.write(op, writables);
} catch (IOException ie) {
- FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
+ LOG.error("logEdit: removing "+ eStream.getName(), ie);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
@@ -1041,7 +549,7 @@ public class FSEditLog {
eStream.setReadyToFlush();
streams.add(eStream);
} catch (IOException ie) {
- FSNamesystem.LOG.error("Unable to get ready to flush.", ie);
+ LOG.error("Unable to get ready to flush.", ie);
//
// remember the streams that encountered an error.
//
@@ -1063,7 +571,7 @@ public class FSEditLog {
try {
eStream.flush();
} catch (IOException ie) {
- FSNamesystem.LOG.error("Unable to sync edit log.", ie);
+ LOG.error("Unable to sync edit log.", ie);
//
// remember the streams that encountered an error.
//
@@ -1119,7 +627,7 @@ public class FSEditLog {
buf.append(eStream.getTotalSyncTime());
buf.append(" ");
}
- FSNamesystem.LOG.info(buf);
+ LOG.info(buf);
}
/**
@@ -1134,7 +642,7 @@ public class FSEditLog {
FSEditLog.toLogLong(newNode.getModificationTime()),
FSEditLog.toLogLong(newNode.getAccessTime()),
FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
- logEdit(OP_ADD,
+ logEdit(Ops.OP_ADD,
new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
new ArrayWritable(Block.class, newNode.getBlocks()),
newNode.getPermissionStatus(),
@@ -1152,7 +660,7 @@ public class FSEditLog {
FSEditLog.toLogLong(newNode.getModificationTime()),
FSEditLog.toLogLong(newNode.getAccessTime()),
FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
- logEdit(OP_CLOSE,
+ logEdit(Ops.OP_CLOSE,
new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
new ArrayWritable(Block.class, newNode.getBlocks()),
newNode.getPermissionStatus());
@@ -1167,7 +675,7 @@ public class FSEditLog {
FSEditLog.toLogLong(newNode.getModificationTime()),
FSEditLog.toLogLong(newNode.getAccessTime())
};
- logEdit(OP_MKDIR, new ArrayWritable(DeprecatedUTF8.class, info),
+ logEdit(Ops.OP_MKDIR, new ArrayWritable(DeprecatedUTF8.class, info),
newNode.getPermissionStatus());
}
@@ -1180,7 +688,7 @@ public class FSEditLog {
new DeprecatedUTF8(src),
new DeprecatedUTF8(dst),
FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+ logEdit(Ops.OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
}
/**
@@ -1191,7 +699,7 @@ public class FSEditLog {
new DeprecatedUTF8(src),
new DeprecatedUTF8(dst),
FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
+ logEdit(Ops.OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
toBytesWritable(options));
}
@@ -1199,7 +707,7 @@ public class FSEditLog {
* Add set replication record to edit log
*/
void logSetReplication(String src, short replication) {
- logEdit(OP_SET_REPLICATION,
+ logEdit(Ops.OP_SET_REPLICATION,
new DeprecatedUTF8(src),
FSEditLog.toLogReplication(replication));
}
@@ -1210,20 +718,20 @@ public class FSEditLog {
* @param quota the directory size limit
*/
void logSetQuota(String src, long nsQuota, long dsQuota) {
- logEdit(OP_SET_QUOTA, new DeprecatedUTF8(src),
+ logEdit(Ops.OP_SET_QUOTA, new DeprecatedUTF8(src),
new LongWritable(nsQuota), new LongWritable(dsQuota));
}
/** Add set permissions record to edit log */
void logSetPermissions(String src, FsPermission permissions) {
- logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
+ logEdit(Ops.OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
}
/** Add set owner record to edit log */
void logSetOwner(String src, String username, String groupname) {
DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
- logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
+ logEdit(Ops.OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
}
/**
@@ -1238,7 +746,7 @@ public class FSEditLog {
info[idx++] = new DeprecatedUTF8(srcs[i]);
}
info[idx] = FSEditLog.toLogLong(timestamp);
- logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ logEdit(Ops.OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
}
/**
@@ -1248,14 +756,14 @@ public class FSEditLog {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
new DeprecatedUTF8(src),
FSEditLog.toLogLong(timestamp)};
- logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ logEdit(Ops.OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
}
/**
* Add generation stamp record to edit log
*/
void logGenerationStamp(long genstamp) {
- logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
+ logEdit(Ops.OP_SET_GENSTAMP, new LongWritable(genstamp));
}
/**
@@ -1266,7 +774,7 @@ public class FSEditLog {
new DeprecatedUTF8(src),
FSEditLog.toLogLong(mtime),
FSEditLog.toLogLong(atime)};
- logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
+ logEdit(Ops.OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
}
/**
@@ -1279,7 +787,7 @@ public class FSEditLog {
new DeprecatedUTF8(value),
FSEditLog.toLogLong(mtime),
FSEditLog.toLogLong(atime)};
- logEdit(OP_SYMLINK,
+ logEdit(Ops.OP_SYMLINK,
new ArrayWritable(DeprecatedUTF8.class, info),
node.getPermissionStatus());
}
@@ -1292,20 +800,20 @@ public class FSEditLog {
*/
void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
- logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+ logEdit(Ops.OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
}
void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
- logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+ logEdit(Ops.OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
}
void logCancelDelegationToken(DelegationTokenIdentifier id) {
- logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+ logEdit(Ops.OP_CANCEL_DELEGATION_TOKEN, id);
}
void logUpdateMasterKey(DelegationKey key) {
- logEdit(OP_UPDATE_MASTER_KEY, key);
+ logEdit(Ops.OP_UPDATE_MASTER_KEY, key);
}
static private DeprecatedUTF8 toLogReplication(short replication) {
@@ -1332,7 +840,7 @@ public class FSEditLog {
"Wrong streams size";
size = Math.max(size, curSize);
} catch (IOException e) {
- FSImage.LOG.warn("getEditLogSize: editstream.length failed. removing editlog (" +
+ LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
idx + ") " + es.getName());
if(al==null) al = new ArrayList<EditLogOutputStream>(1);
al.add(es);
@@ -1403,7 +911,7 @@ public class FSEditLog {
// replace by the new stream
itE.replace(eStream);
} catch (IOException e) {
- FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
+ LOG.warn("Error in editStream " + eStream.getName(), e);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
@@ -1482,7 +990,7 @@ public class FSEditLog {
// replace by the new stream
itE.replace(eStream);
} catch (IOException e) {
- FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
+ LOG.warn("Error in editStream " + eStream.getName(), e);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
@@ -1520,84 +1028,6 @@ public class FSEditLog {
sizeOutputFlushBuffer = size;
}
- /**
- * A class to read in blocks stored in the old format. The only two
- * fields in the block were blockid and length.
- */
- static class BlockTwo implements Writable {
- long blkid;
- long len;
-
- static { // register a ctor
- WritableFactories.setFactory
- (BlockTwo.class,
- new WritableFactory() {
- public Writable newInstance() { return new BlockTwo(); }
- });
- }
-
-
- BlockTwo() {
- blkid = 0;
- len = 0;
- }
- /////////////////////////////////////
- // Writable
- /////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- out.writeLong(blkid);
- out.writeLong(len);
- }
-
- public void readFields(DataInput in) throws IOException {
- this.blkid = in.readLong();
- this.len = in.readLong();
- }
- }
-
- /** This method is defined for compatibility reason. */
- static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
- ) throws IOException {
- DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
- for (int i = 0; i < locations.length; i++) {
- locations[i] = new DatanodeDescriptor();
- locations[i].readFieldsFromFSEditLog(in);
- }
- return locations;
- }
-
- static private short readShort(DataInputStream in) throws IOException {
- return Short.parseShort(FSImage.readString(in));
- }
-
- static private long readLong(DataInputStream in) throws IOException {
- return Long.parseLong(FSImage.readString(in));
- }
-
- static private BlockInfo[] readBlocks(
- DataInputStream in,
- int logVersion,
- boolean isFileUnderConstruction,
- short replication) throws IOException {
- int numBlocks = in.readInt();
- BlockInfo[] blocks = new BlockInfo[numBlocks];
- Block blk = new Block();
- BlockTwo oldblk = new BlockTwo();
- for (int i = 0; i < numBlocks; i++) {
- if (logVersion <= -14) {
- blk.readFields(in);
- } else {
- oldblk.readFields(in);
- blk.set(oldblk.blkid, oldblk.len,
- GenerationStamp.GRANDFATHER_GENERATION_STAMP);
- }
- if(isFileUnderConstruction && i == numBlocks-1)
- blocks[i] = new BlockInfoUnderConstruction(blk, replication);
- else
- blocks[i] = new BlockInfo(blk, replication);
- }
- return blocks;
- }
boolean isEmpty() throws IOException {
return getEditLogSize() <= 0;
@@ -1632,7 +1062,7 @@ public class FSEditLog {
boStream = new EditLogBackupOutputStream(bnReg, nnReg);
editStreams.add(boStream);
}
- logEdit(OP_JSPOOL_START, (Writable[])null);
+ logEdit(Ops.OP_JSPOOL_START, (Writable[])null);
}
/**
@@ -1648,7 +1078,7 @@ public class FSEditLog {
try {
eStream.write(data, 0, length);
} catch (IOException ie) {
- FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), ie);
+ LOG.warn("Error in editStream " + eStream.getName(), ie);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
@@ -1730,7 +1160,7 @@ public class FSEditLog {
void incrementCheckpointTime() {
fsimage.incrementCheckpointTime();
Writable[] args = {new LongWritable(fsimage.getCheckpointTime())};
- logEdit(OP_CHECKPOINT_TIME, args);
+ logEdit(Ops.OP_CHECKPOINT_TIME, args);
}
synchronized void releaseBackupStream(NamenodeRegistration registration) {
@@ -1781,19 +1211,6 @@ public class FSEditLog {
return regAllowed;
}
- static Rename[] readRenameOptions(DataInputStream in) throws IOException {
- BytesWritable writable = new BytesWritable();
- writable.readFields(in);
-
- byte[] bytes = writable.getBytes();
- Rename[] options = new Rename[bytes.length];
-
- for (int i = 0; i < bytes.length; i++) {
- options[i] = Rename.valueOf(bytes[i]);
- }
- return options;
- }
-
static BytesWritable toBytesWritable(Options.Rename... options) {
byte[] bytes = new byte[options.length];
for (int i = 0; i < options.length; i++) {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Oct 29 17:04:07 2010
@@ -1142,7 +1142,7 @@ public class FSImage extends Storage {
long blockSize = 0;
pathComponents = readPathComponents(in);
replication = in.readShort();
- replication = editLog.adjustReplication(replication);
+ replication = fsNamesys.adjustReplication(replication);
modificationTime = in.readLong();
if (imgVersion <= -17) {
atime = in.readLong();
@@ -1283,17 +1283,19 @@ public class FSImage extends Storage {
* @throws IOException
*/
int loadFSEdits(StorageDirectory sd) throws IOException {
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+
int numEdits = 0;
EditLogFileInputStream edits =
new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
- numEdits = editLog.loadFSEdits(edits);
+ numEdits = loader.loadFSEdits(edits);
edits.close();
File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
if (editsNew.exists() && editsNew.length() > 0) {
edits = new EditLogFileInputStream(editsNew);
- numEdits += editLog.loadFSEdits(edits);
+ numEdits += loader.loadFSEdits(edits);
edits.close();
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 29 17:04:07 2010
@@ -3470,6 +3470,22 @@ public class FSNamesystem implements FSC
short getMaxReplication() { return (short)blockManager.maxReplication; }
short getMinReplication() { return (short)blockManager.minReplication; }
short getDefaultReplication() { return (short)blockManager.defaultReplication; }
+
+ /**
+ * Clamp the specified replication between the minimum and maximum
+ * replication levels for this namesystem.
+ */
+ short adjustReplication(short replication) {
+ short minReplication = getMinReplication();
+ if (replication < minReplication) {
+ replication = minReplication;
+ }
+ short maxReplication = getMaxReplication();
+ if (replication > maxReplication) {
+ replication = maxReplication;
+ }
+ return replication;
+ }
/**
* A immutable object that stores the number of live replicas and
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Oct 29 17:04:07 2010
@@ -138,11 +138,12 @@ public class TestEditLog extends TestCas
// If there were any corruptions, it is likely that the reading in
// of these transactions will throw an exception.
//
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem);
for (Iterator<StorageDirectory> it =
fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
System.out.println("Verifying file: " + editFile);
- int numEdits = namesystem.getEditLog().loadFSEdits(
+ int numEdits = loader.loadFSEdits(
new EditLogFileInputStream(editFile));
int numLeases = namesystem.leaseManager.countLease();
System.out.println("Number of outstanding leases " + numLeases);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java Fri Oct 29 17:04:07 2010
@@ -221,7 +221,7 @@ public class TestEditLogRace {
fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
System.out.println("Verifying file: " + editFile);
- int numEdits = namesystem.getEditLog().loadFSEdits(
+ int numEdits = new FSEditLogLoader(namesystem).loadFSEdits(
new EditLogFileInputStream(editFile));
System.out.println("Number of edits: " + numEdits);
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Fri Oct 29 17:04:07 2010
@@ -134,13 +134,14 @@ public class TestSecurityTokenEditLog ex
// If there were any corruptions, it is likely that the reading in
// of these transactions will throw an exception.
//
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem);
namesystem.getDelegationTokenSecretManager().stopThreads();
int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys();
for (Iterator<StorageDirectory> it =
fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
System.out.println("Verifying file: " + editFile);
- int numEdits = namesystem.getEditLog().loadFSEdits(
+ int numEdits = loader.loadFSEdits(
new EditLogFileInputStream(editFile));
assertTrue("Verification for " + editFile + " failed. " +
"Expected " + (NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS + numKeys) + " transactions. "+