You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/09 20:36:17 UTC
svn commit: r1134031 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: todd
Date: Thu Jun 9 18:36:16 2011
New Revision: 1134031
URL: http://svn.apache.org/viewvc?rev=1134031&view=rev
Log:
HDFS-2003. Separate FSEditLog reading logic from edit log memory state building logic. Contributed by Ivan Kelly.
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1134031&r1=1134030&r2=1134031&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Jun 9 18:36:16 2011
@@ -493,6 +493,9 @@ Trunk (unreleased changes)
HDFS-1586. Add InterfaceAudience and InterfaceStability annotations to
MiniDFSCluster. (suresh)
+ HDFS-2003. Separate FSEditLog reading logic from edit log memory state
+ building logic. (Ivan Kelly via todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1134031&r1=1134030&r2=1134031&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Jun 9 18:36:16 2011
@@ -18,9 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedInputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -29,27 +27,15 @@ import java.util.Arrays;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
public class FSEditLogLoader {
private final FSNamesystem fsNamesys;
@@ -80,7 +66,7 @@ public class FSEditLogLoader {
*/
int readLogVersion(DataInputStream in) throws IOException {
int logVersion = 0;
- // Read log file version. Could be missing.
+ // Read log file version. Could be missing.
in.mark(4);
// If edits log is greater than 2G, available method will return negative
// numbers, so we avoid having to call available
@@ -96,7 +82,7 @@ public class FSEditLogLoader {
if (logVersion < FSConstants.LAYOUT_VERSION) // future version
throw new IOException(
"Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
+ + logVersion + ". Current version = "
+ FSConstants.LAYOUT_VERSION + ".");
}
assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
@@ -107,7 +93,7 @@ public class FSEditLogLoader {
int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException {
BufferedInputStream bin = new BufferedInputStream(edits);
DataInputStream in = new DataInputStream(bin);
-
+
int numEdits = 0;
int logVersion = 0;
@@ -134,9 +120,7 @@ public class FSEditLogLoader {
Checksum checksum, boolean closeOnExit) throws IOException {
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
- String clientName = null;
- String clientMachine = null;
- String path = null;
+
int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
@@ -154,57 +138,41 @@ public class FSEditLogLoader {
try {
try {
- while (true) {
- long timestamp = 0;
- long mtime = 0;
- long atime = 0;
- long blockSize = 0;
- FSEditLogOpCodes opCode;
- try {
- if (checksum != null) {
- checksum.reset();
- }
- in.mark(1);
- byte opCodeByte = in.readByte();
- opCode = FSEditLogOpCodes.fromByte(opCodeByte);
- if (opCode == FSEditLogOpCodes.OP_INVALID) {
- in.reset(); // reset back to end of file if somebody reads it again
- break; // no more transactions
- }
- } catch (EOFException e) {
- break; // no more transactions
- }
+ FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
+ checksum);
+ FSEditLogOp op;
+ while ((op = reader.readOp()) != null) {
recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
tracker.getPos();
numEdits++;
- switch (opCode) {
+ switch (op.opCode) {
case OP_ADD:
case OP_CLOSE: {
+ AddCloseOp addCloseOp = (AddCloseOp)op;
+
// versions > 0 support per file replication
// get name and replication
- int length = in.readInt();
- if (-7 == logVersion && length != 3||
- -17 < logVersion && logVersion < -7 && length != 4 ||
- logVersion <= -17 && length != 5) {
- throw new IOException("Incorrect data format." +
- " logVersion is " + logVersion +
- " but writables.length is " +
- length + ". ");
- }
- path = FSImageSerialization.readString(in);
- short replication = fsNamesys.adjustReplication(readShort(in));
- mtime = readLong(in);
- if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
- atime = readLong(in);
+ short replication
+ = fsNamesys.adjustReplication(addCloseOp.replication);
+
+ long blockSize = addCloseOp.blockSize;
+ BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
+ for (int i = 0; i < addCloseOp.blocks.length; i++) {
+ if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
+ && i == addCloseOp.blocks.length-1) {
+ blocks[i] = new BlockInfoUnderConstruction(addCloseOp.blocks[i],
+ replication);
+ } else {
+ blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
+ }
}
- if (logVersion < -7) {
- blockSize = readLong(in);
+
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
+ if (addCloseOp.permissions != null) {
+ permissions = addCloseOp.permissions;
}
- // get blocks
- boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
- BlockInfo blocks[] =
- readBlocks(in, logVersion, isFileUnderConstruction, replication);
-
+
+
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
@@ -217,41 +185,25 @@ public class FSEditLogLoader {
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
}
-
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
- }
-
- // clientname, clientMachine and block locations of last block.
- if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
- clientName = FSImageSerialization.readString(in);
- clientMachine = FSImageSerialization.readString(in);
- if (-13 <= logVersion) {
- readDatanodeDescriptorArray(in);
- }
- } else {
- clientName = "";
- clientMachine = "";
- }
-
+
+
// The open lease transaction re-creates a file if necessary.
// Delete the file if it already exists.
if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug(opCode + ": " + path +
- " numblocks : " + blocks.length +
- " clientHolder " + clientName +
- " clientMachine " + clientMachine);
+ FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+ " numblocks : " + blocks.length +
+ " clientHolder " + addCloseOp.clientName +
+ " clientMachine " + addCloseOp.clientMachine);
}
-
- fsDir.unprotectedDelete(path, mtime);
-
+
+ fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
+
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
- path, permissions,
- blocks, replication,
- mtime, atime, blockSize);
- if (isFileUnderConstruction) {
+ addCloseOp.path, permissions,
+ blocks, replication,
+ addCloseOp.mtime, addCloseOp.atime, blockSize);
+ if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
numOpAdd++;
//
// Replace current node with a INodeUnderConstruction.
@@ -259,247 +211,193 @@ public class FSEditLogLoader {
//
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
- node.getReplication(),
+ node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
- clientName,
- clientMachine,
+ addCloseOp.clientName,
+ addCloseOp.clientMachine,
null);
- fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.getClientName(), path);
+ fsDir.replaceNode(addCloseOp.path, node, cons);
+ fsNamesys.leaseManager.addLease(cons.getClientName(),
+ addCloseOp.path);
}
break;
- }
+ }
case OP_SET_REPLICATION: {
numOpSetRepl++;
- path = FSImageSerialization.readString(in);
- short replication = fsNamesys.adjustReplication(readShort(in));
- fsDir.unprotectedSetReplication(path, replication, null);
+ SetReplicationOp setReplicationOp = (SetReplicationOp)op;
+ short replication
+ = fsNamesys.adjustReplication(setReplicationOp.replication);
+ fsDir.unprotectedSetReplication(setReplicationOp.path,
+ replication, null);
break;
- }
+ }
case OP_CONCAT_DELETE: {
numOpConcatDelete++;
- int length = in.readInt();
- if (length < 3) { // trg, srcs.., timestam
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String trg = FSImageSerialization.readString(in);
- int srcSize = length - 1 - 1; //trg and timestamp
- String [] srcs = new String [srcSize];
- for(int i=0; i<srcSize;i++) {
- srcs[i]= FSImageSerialization.readString(in);
- }
- timestamp = readLong(in);
- fsDir.unprotectedConcat(trg, srcs);
+
+ ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
+ fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs);
break;
}
case OP_RENAME_OLD: {
numOpRenameOld++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImageSerialization.readString(in);
- String d = FSImageSerialization.readString(in);
- timestamp = readLong(in);
- HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
- fsDir.unprotectedRenameTo(s, d, timestamp);
- fsNamesys.changeLease(s, d, dinfo);
+ RenameOldOp renameOp = (RenameOldOp)op;
+ HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+ fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+ renameOp.timestamp);
+ fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_DELETE: {
numOpDelete++;
- int length = in.readInt();
- if (length != 2) {
- throw new IOException("Incorrect data format. "
- + "delete operation.");
- }
- path = FSImageSerialization.readString(in);
- timestamp = readLong(in);
- fsDir.unprotectedDelete(path, timestamp);
+
+ DeleteOp deleteOp = (DeleteOp)op;
+ fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
break;
}
case OP_MKDIR: {
numOpMkDir++;
+ MkdirOp mkdirOp = (MkdirOp)op;
PermissionStatus permissions = fsNamesys.getUpgradePermission();
- int length = in.readInt();
- if (-17 < logVersion && length != 2 ||
- logVersion <= -17 && length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- path = FSImageSerialization.readString(in);
- timestamp = readLong(in);
-
- // The disk format stores atimes for directories as well.
- // However, currently this is not being updated/used because of
- // performance reasons.
- if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
- atime = readLong(in);
- }
-
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
+ if (mkdirOp.permissions != null) {
+ permissions = mkdirOp.permissions;
}
- fsDir.unprotectedMkdir(path, permissions, timestamp);
+
+ fsDir.unprotectedMkdir(mkdirOp.path, permissions,
+ mkdirOp.timestamp);
break;
}
case OP_SET_GENSTAMP: {
numOpSetGenStamp++;
- long lw = in.readLong();
- fsNamesys.setGenerationStamp(lw);
- break;
- }
- case OP_DATANODE_ADD: {
- numOpOther++;
- //Datanodes are not persistent any more.
- FSImageSerialization.DatanodeImage.skipOne(in);
- break;
- }
- case OP_DATANODE_REMOVE: {
- numOpOther++;
- DatanodeID nodeID = new DatanodeID();
- nodeID.readFields(in);
- //Datanodes are not persistent any more.
+ SetGenstampOp setGenstampOp = (SetGenstampOp)op;
+ fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
break;
}
case OP_SET_PERMISSIONS: {
numOpSetPerm++;
- fsDir.unprotectedSetPermission(
- FSImageSerialization.readString(in), FsPermission.read(in));
+
+ SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
+ fsDir.unprotectedSetPermission(setPermissionsOp.src,
+ setPermissionsOp.permissions);
break;
}
case OP_SET_OWNER: {
numOpSetOwner++;
- if (logVersion > -11)
- throw new IOException("Unexpected opCode " + opCode
- + " for version " + logVersion);
- fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
- FSImageSerialization.readString_EmptyAsNull(in),
- FSImageSerialization.readString_EmptyAsNull(in));
+
+ SetOwnerOp setOwnerOp = (SetOwnerOp)op;
+ fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
+ setOwnerOp.groupname);
break;
}
case OP_SET_NS_QUOTA: {
- fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
- readLongWritable(in),
+ SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
+ fsDir.unprotectedSetQuota(setNSQuotaOp.src,
+ setNSQuotaOp.nsQuota,
FSConstants.QUOTA_DONT_SET);
break;
}
case OP_CLEAR_NS_QUOTA: {
- fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+ ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
+ fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
FSConstants.QUOTA_RESET,
FSConstants.QUOTA_DONT_SET);
break;
}
-
+
case OP_SET_QUOTA:
- fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
- readLongWritable(in),
- readLongWritable(in));
-
+ SetQuotaOp setQuotaOp = (SetQuotaOp)op;
+ fsDir.unprotectedSetQuota(setQuotaOp.src,
+ setQuotaOp.nsQuota,
+ setQuotaOp.dsQuota);
break;
-
+
case OP_TIMES: {
numOpTimes++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "times operation.");
- }
- path = FSImageSerialization.readString(in);
- mtime = readLong(in);
- atime = readLong(in);
- fsDir.unprotectedSetTimes(path, mtime, atime, true);
+ TimesOp timesOp = (TimesOp)op;
+
+ fsDir.unprotectedSetTimes(timesOp.path,
+ timesOp.mtime,
+ timesOp.atime, true);
break;
}
case OP_SYMLINK: {
numOpSymlink++;
- int length = in.readInt();
- if (length != 4) {
- throw new IOException("Incorrect data format. "
- + "symlink operation.");
- }
- path = FSImageSerialization.readString(in);
- String value = FSImageSerialization.readString(in);
- mtime = readLong(in);
- atime = readLong(in);
- PermissionStatus perm = PermissionStatus.read(in);
- fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+
+ SymlinkOp symlinkOp = (SymlinkOp)op;
+ fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
+ symlinkOp.mtime, symlinkOp.atime,
+ symlinkOp.permissionStatus);
break;
}
case OP_RENAME: {
numOpRename++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImageSerialization.readString(in);
- String d = FSImageSerialization.readString(in);
- timestamp = readLong(in);
- Rename[] options = readRenameOptions(in);
- HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
- fsDir.unprotectedRenameTo(s, d, timestamp, options);
- fsNamesys.changeLease(s, d, dinfo);
+ RenameOp renameOp = (RenameOp)op;
+
+ HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+ fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+ renameOp.timestamp, renameOp.options);
+ fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_GET_DELEGATION_TOKEN: {
numOpGetDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- long expiryTime = readLong(in);
+ GetDelegationTokenOp getDelegationTokenOp
+ = (GetDelegationTokenOp)op;
+
fsNamesys.getDelegationTokenSecretManager()
- .addPersistedDelegationToken(delegationTokenId, expiryTime);
+ .addPersistedDelegationToken(getDelegationTokenOp.token,
+ getDelegationTokenOp.expiryTime);
break;
}
case OP_RENEW_DELEGATION_TOKEN: {
numOpRenewDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- long expiryTime = readLong(in);
+
+ RenewDelegationTokenOp renewDelegationTokenOp
+ = (RenewDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
- .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+ .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
+ renewDelegationTokenOp.expiryTime);
break;
}
case OP_CANCEL_DELEGATION_TOKEN: {
numOpCancelDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
+
+ CancelDelegationTokenOp cancelDelegationTokenOp
+ = (CancelDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
- .updatePersistedTokenCancellation(delegationTokenId);
+ .updatePersistedTokenCancellation(
+ cancelDelegationTokenOp.token);
break;
}
case OP_UPDATE_MASTER_KEY: {
numOpUpdateMasterKey++;
- DelegationKey delegationKey = new DelegationKey();
- delegationKey.readFields(in);
- fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
- delegationKey);
+ UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
+ fsNamesys.getDelegationTokenSecretManager()
+ .updatePersistedMasterKey(updateMasterKeyOp.key);
break;
}
case OP_REASSIGN_LEASE: {
numOpReassignLease++;
- String leaseHolder = FSImageSerialization.readString(in);
- path = FSImageSerialization.readString(in);
- String newHolder = FSImageSerialization.readString(in);
- Lease lease = fsNamesys.leaseManager.getLease(leaseHolder);
+ ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
+
+ Lease lease = fsNamesys.leaseManager.getLease(
+ reassignLeaseOp.leaseHolder);
INodeFileUnderConstruction pendingFile =
- (INodeFileUnderConstruction) fsDir.getFileINode(path);
- fsNamesys.reassignLeaseInternal(lease, path, newHolder, pendingFile);
+ (INodeFileUnderConstruction) fsDir.getFileINode(
+ reassignLeaseOp.path);
+ fsNamesys.reassignLeaseInternal(lease,
+ reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
break;
}
- default: {
- throw new IOException("Never seen opCode " + opCode);
- }
+ case OP_DATANODE_ADD:
+ case OP_DATANODE_REMOVE:
+ numOpOther++;
+ break;
+ default:
+ throw new IOException("Invalid operation read " + op.opCode);
}
- validateChecksum(in, checksum, numEdits);
}
} catch (IOException ex) {
check203UpgradeFailure(logVersion, ex);
@@ -547,126 +445,6 @@ public class FSEditLogLoader {
}
/**
- * Validate a transaction's checksum
- */
- private static void validateChecksum(
- DataInputStream in, Checksum checksum, int tid)
- throws IOException {
- if (checksum != null) {
- int calculatedChecksum = (int)checksum.getValue();
- int readChecksum = in.readInt(); // read in checksum
- if (readChecksum != calculatedChecksum) {
- throw new ChecksumException(
- "Transaction " + tid + " is corrupt. Calculated checksum is " +
- calculatedChecksum + " but read checksum " + readChecksum, tid);
- }
- }
- }
-
- /**
- * A class to read in blocks stored in the old format. The only two
- * fields in the block were blockid and length.
- */
- static class BlockTwo implements Writable {
- long blkid;
- long len;
-
- static { // register a ctor
- WritableFactories.setFactory
- (BlockTwo.class,
- new WritableFactory() {
- public Writable newInstance() { return new BlockTwo(); }
- });
- }
-
-
- BlockTwo() {
- blkid = 0;
- len = 0;
- }
- /////////////////////////////////////
- // Writable
- /////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- out.writeLong(blkid);
- out.writeLong(len);
- }
-
- public void readFields(DataInput in) throws IOException {
- this.blkid = in.readLong();
- this.len = in.readLong();
- }
- }
-
- /** This method is defined for compatibility reason. */
- static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
- ) throws IOException {
- DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
- for (int i = 0; i < locations.length; i++) {
- locations[i] = new DatanodeDescriptor();
- locations[i].readFieldsFromFSEditLog(in);
- }
- return locations;
- }
-
- static private short readShort(DataInputStream in) throws IOException {
- return Short.parseShort(FSImageSerialization.readString(in));
- }
-
- static private long readLong(DataInputStream in) throws IOException {
- return Long.parseLong(FSImageSerialization.readString(in));
- }
-
- // a place holder for reading a long
- private static final LongWritable longWritable = new LongWritable();
-
- /** Read an integer from an input stream */
- private static long readLongWritable(DataInputStream in) throws IOException {
- synchronized (longWritable) {
- longWritable.readFields(in);
- return longWritable.get();
- }
- }
-
- static Rename[] readRenameOptions(DataInputStream in) throws IOException {
- BytesWritable writable = new BytesWritable();
- writable.readFields(in);
-
- byte[] bytes = writable.getBytes();
- Rename[] options = new Rename[bytes.length];
-
- for (int i = 0; i < bytes.length; i++) {
- options[i] = Rename.valueOf(bytes[i]);
- }
- return options;
- }
-
- static private BlockInfo[] readBlocks(
- DataInputStream in,
- int logVersion,
- boolean isFileUnderConstruction,
- short replication) throws IOException {
- int numBlocks = in.readInt();
- BlockInfo[] blocks = new BlockInfo[numBlocks];
- Block blk = new Block();
- BlockTwo oldblk = new BlockTwo();
- for (int i = 0; i < numBlocks; i++) {
- if (logVersion <= -14) {
- blk.readFields(in);
- } else {
- oldblk.readFields(in);
- blk.set(oldblk.blkid, oldblk.len,
- GenerationStamp.GRANDFATHER_GENERATION_STAMP);
- }
- if(isFileUnderConstruction && i == numBlocks-1)
- blocks[i] = new BlockInfoUnderConstruction(blk, replication);
- else
- blocks[i] = new BlockInfo(blk, replication);
- }
- return blocks;
- }
-
- /**
* Throw appropriate exception during upgrade from 203, when editlog loading
* could fail due to opcode conflicts.
*/
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1134031&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Jun 9 18:36:16 2011
@@ -0,0 +1,739 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.zip.Checksum;
+import java.util.EnumMap;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+
+/**
+ * Helper classes for reading the ops from an InputStream.
+ * All ops derive from FSEditLogOp and are only
+ * instantiated from Reader#readOp()
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class FSEditLogOp {
+ final FSEditLogOpCodes opCode;
+
+ /**
+ * Constructor for an EditLog Op. EditLog ops cannot be constructed
+ * directly, but only through Reader#readOp.
+ */
+ private FSEditLogOp(FSEditLogOpCodes opCode) {
+ this.opCode = opCode;
+ }
+
+ public abstract void readFields(DataInputStream in, int logVersion)
+ throws IOException;
+
+ static class AddCloseOp extends FSEditLogOp {
+ int length;
+ String path;
+ short replication;
+ long mtime;
+ long atime;
+ long blockSize;
+ Block[] blocks;
+ PermissionStatus permissions;
+ String clientName;
+ String clientMachine;
+ //final DatanodeDescriptor[] dataNodeDescriptors; UNUSED
+
+ private AddCloseOp(FSEditLogOpCodes opCode) {
+ super(opCode);
+ assert(opCode == OP_ADD || opCode == OP_CLOSE);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ // versions > 0 support per file replication
+ // get name and replication
+ this.length = in.readInt();
+ if (-7 == logVersion && length != 3||
+ -17 < logVersion && logVersion < -7 && length != 4 ||
+ logVersion <= -17 && length != 5) {
+ throw new IOException("Incorrect data format." +
+ " logVersion is " + logVersion +
+ " but writables.length is " +
+ length + ". ");
+ }
+ this.path = FSImageSerialization.readString(in);
+ this.replication = readShort(in);
+ this.mtime = readLong(in);
+ if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
+ this.atime = readLong(in);
+ } else {
+ this.atime = 0;
+ }
+ if (logVersion < -7) {
+ this.blockSize = readLong(in);
+ } else {
+ this.blockSize = 0;
+ }
+
+ // get blocks
+ this.blocks = readBlocks(in, logVersion);
+
+ if (logVersion <= -11) {
+ this.permissions = PermissionStatus.read(in);
+ } else {
+ this.permissions = null;
+ }
+
+ // clientname, clientMachine and block locations of last block.
+ if (this.opCode == OP_ADD && logVersion <= -12) {
+ this.clientName = FSImageSerialization.readString(in);
+ this.clientMachine = FSImageSerialization.readString(in);
+ if (-13 <= logVersion) {
+ readDatanodeDescriptorArray(in);
+ }
+ } else {
+ this.clientName = "";
+ this.clientMachine = "";
+ }
+ }
+
+ /** This method is defined for compatibility reason. */
+ private static DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in)
+ throws IOException {
+ DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = new DatanodeDescriptor();
+ locations[i].readFieldsFromFSEditLog(in);
+ }
+ return locations;
+ }
+
+ private static Block[] readBlocks(
+ DataInputStream in,
+ int logVersion) throws IOException {
+ int numBlocks = in.readInt();
+ Block[] blocks = new Block[numBlocks];
+ for (int i = 0; i < numBlocks; i++) {
+ Block blk = new Block();
+ if (logVersion <= -14) {
+ blk.readFields(in);
+ } else {
+ BlockTwo oldblk = new BlockTwo();
+ oldblk.readFields(in);
+ blk.set(oldblk.blkid, oldblk.len,
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+ }
+ blocks[i] = blk;
+ }
+ return blocks;
+ }
+ }
+
+ static class SetReplicationOp extends FSEditLogOp {
+ String path;
+ short replication;
+
+ private SetReplicationOp() {
+ super(OP_SET_REPLICATION);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.path = FSImageSerialization.readString(in);
+ this.replication = readShort(in);
+ }
+ }
+
+ static class ConcatDeleteOp extends FSEditLogOp {
+ int length;
+ String trg;
+ String[] srcs;
+ long timestamp;
+
+ private ConcatDeleteOp() {
+ super(OP_CONCAT_DELETE);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.length = in.readInt();
+ if (length < 3) { // trg, srcs.., timestam
+ throw new IOException("Incorrect data format. "
+ + "Concat delete operation.");
+ }
+ this.trg = FSImageSerialization.readString(in);
+ int srcSize = this.length - 1 - 1; //trg and timestamp
+ this.srcs = new String [srcSize];
+ for(int i=0; i<srcSize;i++) {
+ srcs[i]= FSImageSerialization.readString(in);
+ }
+ this.timestamp = readLong(in);
+ }
+ }
+
+ static class RenameOldOp extends FSEditLogOp {
+ int length;
+ String src;
+ String dst;
+ long timestamp;
+
+ private RenameOldOp() {
+ super(OP_RENAME_OLD);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.length = in.readInt();
+ if (this.length != 3) {
+ throw new IOException("Incorrect data format. "
+ + "Old rename operation.");
+ }
+ this.src = FSImageSerialization.readString(in);
+ this.dst = FSImageSerialization.readString(in);
+ this.timestamp = readLong(in);
+ }
+ }
+
+ static class DeleteOp extends FSEditLogOp {
+ int length;
+ String path;
+ long timestamp;
+
+ private DeleteOp() {
+ super(OP_DELETE);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+
+ this.length = in.readInt();
+ if (this.length != 2) {
+ throw new IOException("Incorrect data format. "
+ + "delete operation.");
+ }
+ this.path = FSImageSerialization.readString(in);
+ this.timestamp = readLong(in);
+ }
+ }
+
+ static class MkdirOp extends FSEditLogOp {
+ int length;
+ String path;
+ long timestamp;
+ long atime;
+ PermissionStatus permissions;
+
+ private MkdirOp() {
+ super(OP_MKDIR);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+
+ this.length = in.readInt();
+ if (-17 < logVersion && length != 2 ||
+ logVersion <= -17 && length != 3) {
+ throw new IOException("Incorrect data format. "
+ + "Mkdir operation.");
+ }
+ this.path = FSImageSerialization.readString(in);
+ this.timestamp = readLong(in);
+
+ // The disk format stores atimes for directories as well.
+ // However, currently this is not being updated/used because of
+ // performance reasons.
+ if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
+ this.atime = readLong(in);
+ } else {
+ this.atime = 0;
+ }
+
+ if (logVersion <= -11) {
+ this.permissions = PermissionStatus.read(in);
+ } else {
+ this.permissions = null;
+ }
+ }
+ }
+
+ static class SetGenstampOp extends FSEditLogOp {
+ long genStamp;
+
+ private SetGenstampOp() {
+ super(OP_SET_GENSTAMP);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.genStamp = in.readLong();
+ }
+ }
+
+ static class DatanodeAddOp extends FSEditLogOp {
+ @SuppressWarnings("deprecation")
+ private DatanodeAddOp() {
+ super(OP_DATANODE_ADD);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ //Datanodes are not persistent any more.
+ FSImageSerialization.DatanodeImage.skipOne(in);
+ }
+ }
+
+ static class DatanodeRemoveOp extends FSEditLogOp {
+ @SuppressWarnings("deprecation")
+ private DatanodeRemoveOp() {
+ super(OP_DATANODE_REMOVE);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ DatanodeID nodeID = new DatanodeID();
+ nodeID.readFields(in);
+ //Datanodes are not persistent any more.
+ }
+ }
+
+ static class SetPermissionsOp extends FSEditLogOp {
+ String src;
+ FsPermission permissions;
+
+ private SetPermissionsOp() {
+ super(OP_SET_PERMISSIONS);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.src = FSImageSerialization.readString(in);
+ this.permissions = FsPermission.read(in);
+ }
+ }
+
+ static class SetOwnerOp extends FSEditLogOp {
+ String src;
+ String username;
+ String groupname;
+
+ private SetOwnerOp() {
+ super(OP_SET_OWNER);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.src = FSImageSerialization.readString(in);
+ this.username = FSImageSerialization.readString_EmptyAsNull(in);
+ this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
+ }
+
+ }
+
+ static class SetNSQuotaOp extends FSEditLogOp {
+ String src;
+ long nsQuota;
+
+ private SetNSQuotaOp() {
+ super(OP_SET_NS_QUOTA);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.src = FSImageSerialization.readString(in);
+ this.nsQuota = readLongWritable(in);
+ }
+ }
+
+ static class ClearNSQuotaOp extends FSEditLogOp {
+ String src;
+
+ private ClearNSQuotaOp() {
+ super(OP_CLEAR_NS_QUOTA);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.src = FSImageSerialization.readString(in);
+ }
+ }
+
+ static class SetQuotaOp extends FSEditLogOp {
+ String src;
+ long nsQuota;
+ long dsQuota;
+
+ private SetQuotaOp() {
+ super(OP_SET_QUOTA);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.src = FSImageSerialization.readString(in);
+ this.nsQuota = readLongWritable(in);
+ this.dsQuota = readLongWritable(in);
+ }
+ }
+
+ static class TimesOp extends FSEditLogOp {
+ int length;
+ String path;
+ long mtime;
+ long atime;
+
+ private TimesOp() {
+ super(OP_TIMES);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.length = in.readInt();
+ if (length != 3) {
+ throw new IOException("Incorrect data format. "
+ + "times operation.");
+ }
+ this.path = FSImageSerialization.readString(in);
+ this.mtime = readLong(in);
+ this.atime = readLong(in);
+ }
+ }
+
+ static class SymlinkOp extends FSEditLogOp {
+ int length;
+ String path;
+ String value;
+ long mtime;
+ long atime;
+ PermissionStatus permissionStatus;
+
+ private SymlinkOp() {
+ super(OP_SYMLINK);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+
+ this.length = in.readInt();
+ if (this.length != 4) {
+ throw new IOException("Incorrect data format. "
+ + "symlink operation.");
+ }
+ this.path = FSImageSerialization.readString(in);
+ this.value = FSImageSerialization.readString(in);
+ this.mtime = readLong(in);
+ this.atime = readLong(in);
+ this.permissionStatus = PermissionStatus.read(in);
+ }
+ }
+
+ static class RenameOp extends FSEditLogOp {
+ int length;
+ String src;
+ String dst;
+ long timestamp;
+ Rename[] options;
+
+ private RenameOp() {
+ super(OP_RENAME);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.length = in.readInt();
+ if (this.length != 3) {
+ throw new IOException("Incorrect data format. "
+ + "Rename operation.");
+ }
+ this.src = FSImageSerialization.readString(in);
+ this.dst = FSImageSerialization.readString(in);
+ this.timestamp = readLong(in);
+ this.options = readRenameOptions(in);
+ }
+
+ private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
+ BytesWritable writable = new BytesWritable();
+ writable.readFields(in);
+
+ byte[] bytes = writable.getBytes();
+ Rename[] options = new Rename[bytes.length];
+
+ for (int i = 0; i < bytes.length; i++) {
+ options[i] = Rename.valueOf(bytes[i]);
+ }
+ return options;
+ }
+ }
+
+ static class ReassignLeaseOp extends FSEditLogOp {
+ String leaseHolder;
+ String path;
+ String newHolder;
+
+ private ReassignLeaseOp() {
+ super(OP_REASSIGN_LEASE);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.leaseHolder = FSImageSerialization.readString(in);
+ this.path = FSImageSerialization.readString(in);
+ this.newHolder = FSImageSerialization.readString(in);
+ }
+ }
+
+ static class GetDelegationTokenOp extends FSEditLogOp {
+ DelegationTokenIdentifier token;
+ long expiryTime;
+
+ private GetDelegationTokenOp() {
+ super(OP_GET_DELEGATION_TOKEN);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.token = new DelegationTokenIdentifier();
+ this.token.readFields(in);
+ this.expiryTime = readLong(in);
+ }
+ }
+
+ static class RenewDelegationTokenOp extends FSEditLogOp {
+ DelegationTokenIdentifier token;
+ long expiryTime;
+
+ private RenewDelegationTokenOp() {
+ super(OP_RENEW_DELEGATION_TOKEN);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.token = new DelegationTokenIdentifier();
+ this.token.readFields(in);
+ this.expiryTime = readLong(in);
+ }
+ }
+
+ static class CancelDelegationTokenOp extends FSEditLogOp {
+ DelegationTokenIdentifier token;
+
+ private CancelDelegationTokenOp() {
+ super(OP_CANCEL_DELEGATION_TOKEN);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.token = new DelegationTokenIdentifier();
+ this.token.readFields(in);
+ }
+ }
+
+ static class UpdateMasterKeyOp extends FSEditLogOp {
+ DelegationKey key;
+
+ private UpdateMasterKeyOp() {
+ super(OP_UPDATE_MASTER_KEY);
+ }
+
+ public void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.key = new DelegationKey();
+ this.key.readFields(in);
+ }
+ }
+
+ static private short readShort(DataInputStream in) throws IOException {
+ return Short.parseShort(FSImageSerialization.readString(in));
+ }
+
+ static private long readLong(DataInputStream in) throws IOException {
+ return Long.parseLong(FSImageSerialization.readString(in));
+ }
+
+ /**
+ * A class to read in blocks stored in the old format. The only two
+ * fields in the block were blockid and length.
+ */
+ static class BlockTwo implements Writable {
+ long blkid;
+ long len;
+
+ static { // register a ctor
+ WritableFactories.setFactory
+ (BlockTwo.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new BlockTwo(); }
+ });
+ }
+
+
+ BlockTwo() {
+ blkid = 0;
+ len = 0;
+ }
+ /////////////////////////////////////
+ // Writable
+ /////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(blkid);
+ out.writeLong(len);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.blkid = in.readLong();
+ this.len = in.readLong();
+ }
+ }
+
+ // a place holder for reading a long
+ private static final LongWritable longWritable = new LongWritable();
+
+ /** Read an integer from an input stream */
+ private static long readLongWritable(DataInputStream in) throws IOException {
+ synchronized (longWritable) {
+ longWritable.readFields(in);
+ return longWritable.get();
+ }
+ }
+
+ /**
+ * Class for reading editlog ops from a stream
+ */
+ public static class Reader {
+ private final DataInputStream in;
+ private final int logVersion;
+ private final Checksum checksum;
+ private EnumMap<FSEditLogOpCodes, FSEditLogOp> opInstances;
+ /**
+ * Construct the reader
+ * @param in The stream to read from.
+ * @param logVersion The version of the data coming from the stream.
+ * @param checksum Checksum being used with input stream.
+ */
+ @SuppressWarnings("deprecation")
+ public Reader(DataInputStream in, int logVersion,
+ Checksum checksum) {
+ this.in = in;
+ this.logVersion = logVersion;
+ this.checksum = checksum;
+ opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
+ FSEditLogOpCodes.class);
+ opInstances.put(OP_ADD, new AddCloseOp(OP_ADD));
+ opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE));
+ opInstances.put(OP_SET_REPLICATION, new SetReplicationOp());
+ opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+ opInstances.put(OP_RENAME_OLD, new RenameOldOp());
+ opInstances.put(OP_DELETE, new DeleteOp());
+ opInstances.put(OP_MKDIR, new MkdirOp());
+ opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp());
+ opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp());
+ opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
+ opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+ opInstances.put(OP_SET_OWNER, new SetOwnerOp());
+ opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+ opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+ opInstances.put(OP_SET_QUOTA, new SetQuotaOp());
+ opInstances.put(OP_TIMES, new TimesOp());
+ opInstances.put(OP_SYMLINK, new SymlinkOp());
+ opInstances.put(OP_RENAME, new RenameOp());
+ opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+ opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+ opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+ opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
+ new CancelDelegationTokenOp());
+ opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+ }
+
+ /**
+ * Read an operation from the input stream.
+ *
+ * Note that the objects returned from this method may be re-used by future
+ * calls to the same method.
+ *
+ * @return the operation read from the stream, or null at the end of the file
+ * @throws IOException on error.
+ */
+ public FSEditLogOp readOp() throws IOException {
+ if (checksum != null) {
+ checksum.reset();
+ }
+
+ in.mark(1);
+
+ byte opCodeByte;
+ try {
+ opCodeByte = in.readByte();
+ } catch (EOFException eof) {
+ // EOF at an opcode boundary is expected.
+ return null;
+ }
+
+ FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+ if (opCode == OP_INVALID) {
+ in.reset(); // reset back to end of file if somebody reads it again
+ return null;
+ }
+
+ FSEditLogOp op = opInstances.get(opCode);
+ if (op == null) {
+ throw new IOException("Read invalid opcode " + opCode);
+ }
+ op.readFields(in, logVersion);
+
+ validateChecksum(in, checksum);
+ return op;
+ }
+
+ /**
+ * Validate a transaction's checksum
+ */
+ private void validateChecksum(DataInputStream in,
+ Checksum checksum)
+ throws IOException {
+ if (checksum != null) {
+ int calculatedChecksum = (int)checksum.getValue();
+ int readChecksum = in.readInt(); // read in checksum
+ if (readChecksum != calculatedChecksum) {
+ throw new ChecksumException(
+ "Transaction is corrupt. Calculated checksum is " +
+ calculatedChecksum + " but read checksum " + readChecksum, -1);
+ }
+ }
+ }
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1134031&r1=1134030&r2=1134031&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Thu Jun 9 18:36:16 2011
@@ -24,8 +24,11 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
@@ -75,4 +78,50 @@ public class TestFSEditLogLoader {
e.getMessage().matches(expectedErrorMessage));
}
}
+
+ /**
+ * Test that, if the NN restarts with a new minimum replication,
+ * any files created with the old replication count will get
+ * automatically bumped up to the new minimum upon restart.
+ */
+ @Test
+ public void testReplicationAdjusted() throws IOException {
+ // start a cluster
+ Configuration conf = new HdfsConfiguration();
+ // Replicate and heartbeat fast to shave a few seconds off test
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ // Create a file with replication count 1
+ Path p = new Path("/testfile");
+ DFSTestUtil.createFile(fs, p, 10, /*repl*/ (short)1, 1);
+ DFSTestUtil.waitReplication(fs, p, (short)1);
+
+ // Shut down and restart cluster with new minimum replication of 2
+ cluster.shutdown();
+ cluster = null;
+
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(false).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+
+ // The file should get adjusted to replication 2 when
+ // the edit log is replayed.
+ DFSTestUtil.waitReplication(fs, p, (short)2);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}