You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/09 20:36:17 UTC

svn commit: r1134031 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: todd
Date: Thu Jun  9 18:36:16 2011
New Revision: 1134031

URL: http://svn.apache.org/viewvc?rev=1134031&view=rev
Log:
HDFS-2003. Separate FSEditLog reading logic from edit log memory state building logic. Contributed by Ivan Kelly.

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1134031&r1=1134030&r2=1134031&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Jun  9 18:36:16 2011
@@ -493,6 +493,9 @@ Trunk (unreleased changes)
     HDFS-1586. Add InterfaceAudience and InterfaceStability annotations to 
     MiniDFSCluster. (suresh)
 
+    HDFS-2003. Separate FSEditLog reading logic from edit log memory state
+    building logic. (Ivan Kelly via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1134031&r1=1134030&r2=1134031&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Jun  9 18:36:16 2011
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.BufferedInputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -29,27 +27,15 @@ import java.util.Arrays;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 public class FSEditLogLoader {
   private final FSNamesystem fsNamesys;
@@ -80,7 +66,7 @@ public class FSEditLogLoader {
    */
   int readLogVersion(DataInputStream in) throws IOException {
     int logVersion = 0;
-    // Read log file version. Could be missing. 
+    // Read log file version. Could be missing.
     in.mark(4);
     // If edits log is greater than 2G, available method will return negative
     // numbers, so we avoid having to call available
@@ -96,7 +82,7 @@ public class FSEditLogLoader {
       if (logVersion < FSConstants.LAYOUT_VERSION) // future version
         throw new IOException(
             "Unexpected version of the file system log file: "
-            + logVersion + ". Current version = " 
+            + logVersion + ". Current version = "
             + FSConstants.LAYOUT_VERSION + ".");
     }
     assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
@@ -107,7 +93,7 @@ public class FSEditLogLoader {
   int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException {
     BufferedInputStream bin = new BufferedInputStream(edits);
     DataInputStream in = new DataInputStream(bin);
-    
+
     int numEdits = 0;
     int logVersion = 0;
 
@@ -134,9 +120,7 @@ public class FSEditLogLoader {
       Checksum checksum, boolean closeOnExit) throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
-    String clientName = null;
-    String clientMachine = null;
-    String path = null;
+
     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
         numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
@@ -154,57 +138,41 @@ public class FSEditLogLoader {
 
     try {
       try {
-        while (true) {
-          long timestamp = 0;
-          long mtime = 0;
-          long atime = 0;
-          long blockSize = 0;
-          FSEditLogOpCodes opCode;
-          try {
-            if (checksum != null) {
-              checksum.reset();
-            }
-            in.mark(1);
-            byte opCodeByte = in.readByte();
-            opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-            if (opCode == FSEditLogOpCodes.OP_INVALID) {
-              in.reset(); // reset back to end of file if somebody reads it again
-              break; // no more transactions
-            }
-          } catch (EOFException e) {
-            break; // no more transactions
-          }
+        FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
+                                                           checksum);
+        FSEditLogOp op;
+        while ((op = reader.readOp()) != null) {
           recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
               tracker.getPos();
           numEdits++;
-          switch (opCode) {
+          switch (op.opCode) {
           case OP_ADD:
           case OP_CLOSE: {
+            AddCloseOp addCloseOp = (AddCloseOp)op;
+
             // versions > 0 support per file replication
             // get name and replication
-            int length = in.readInt();
-            if (-7 == logVersion && length != 3||
-                -17 < logVersion && logVersion < -7 && length != 4 ||
-                logVersion <= -17 && length != 5) {
-                throw new IOException("Incorrect data format."  +
-                                      " logVersion is " + logVersion +
-                                      " but writables.length is " +
-                                      length + ". ");
-            }
-            path = FSImageSerialization.readString(in);
-            short replication = fsNamesys.adjustReplication(readShort(in));
-            mtime = readLong(in);
-            if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-              atime = readLong(in);
+            short replication
+              = fsNamesys.adjustReplication(addCloseOp.replication);
+
+            long blockSize = addCloseOp.blockSize;
+            BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
+            for (int i = 0; i < addCloseOp.blocks.length; i++) {
+              if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
+                 && i == addCloseOp.blocks.length-1) {
+                blocks[i] = new BlockInfoUnderConstruction(addCloseOp.blocks[i],
+                                                           replication);
+              } else {
+                blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
+              }
             }
-            if (logVersion < -7) {
-              blockSize = readLong(in);
+
+            PermissionStatus permissions = fsNamesys.getUpgradePermission();
+            if (addCloseOp.permissions != null) {
+              permissions = addCloseOp.permissions;
             }
-            // get blocks
-            boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
-            BlockInfo blocks[] = 
-              readBlocks(in, logVersion, isFileUnderConstruction, replication);
-  
+
+
             // Older versions of HDFS does not store the block size in inode.
             // If the file has more than one block, use the size of the
             // first block as the blocksize. Otherwise use the default
@@ -217,41 +185,25 @@ public class FSEditLogLoader {
                 blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
               }
             }
-             
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            if (logVersion <= -11) {
-              permissions = PermissionStatus.read(in);
-            }
-  
-            // clientname, clientMachine and block locations of last block.
-            if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
-              clientName = FSImageSerialization.readString(in);
-              clientMachine = FSImageSerialization.readString(in);
-              if (-13 <= logVersion) {
-                readDatanodeDescriptorArray(in);
-              }
-            } else {
-              clientName = "";
-              clientMachine = "";
-            }
-  
+
+
             // The open lease transaction re-creates a file if necessary.
             // Delete the file if it already exists.
             if (FSNamesystem.LOG.isDebugEnabled()) {
-              FSNamesystem.LOG.debug(opCode + ": " + path + 
-                                     " numblocks : " + blocks.length +
-                                     " clientHolder " +  clientName +
-                                     " clientMachine " + clientMachine);
+              FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+                  " numblocks : " + blocks.length +
+                  " clientHolder " + addCloseOp.clientName +
+                  " clientMachine " + addCloseOp.clientMachine);
             }
-  
-            fsDir.unprotectedDelete(path, mtime);
-  
+
+            fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
+
             // add to the file tree
             INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                                                      path, permissions,
-                                                      blocks, replication, 
-                                                      mtime, atime, blockSize);
-            if (isFileUnderConstruction) {
+                addCloseOp.path, permissions,
+                blocks, replication,
+                addCloseOp.mtime, addCloseOp.atime, blockSize);
+            if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
               numOpAdd++;
               //
               // Replace current node with a INodeUnderConstruction.
@@ -259,247 +211,193 @@ public class FSEditLogLoader {
               //
               INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
                                         node.getLocalNameBytes(),
-                                        node.getReplication(), 
+                                        node.getReplication(),
                                         node.getModificationTime(),
                                         node.getPreferredBlockSize(),
                                         node.getBlocks(),
                                         node.getPermissionStatus(),
-                                        clientName, 
-                                        clientMachine, 
+                                        addCloseOp.clientName,
+                                        addCloseOp.clientMachine,
                                         null);
-              fsDir.replaceNode(path, node, cons);
-              fsNamesys.leaseManager.addLease(cons.getClientName(), path);
+              fsDir.replaceNode(addCloseOp.path, node, cons);
+              fsNamesys.leaseManager.addLease(cons.getClientName(),
+                                              addCloseOp.path);
             }
             break;
-          } 
+          }
           case OP_SET_REPLICATION: {
             numOpSetRepl++;
-            path = FSImageSerialization.readString(in);
-            short replication = fsNamesys.adjustReplication(readShort(in));
-            fsDir.unprotectedSetReplication(path, replication, null);
+            SetReplicationOp setReplicationOp = (SetReplicationOp)op;
+            short replication
+              = fsNamesys.adjustReplication(setReplicationOp.replication);
+            fsDir.unprotectedSetReplication(setReplicationOp.path,
+                                            replication, null);
             break;
-          } 
+          }
           case OP_CONCAT_DELETE: {
             numOpConcatDelete++;
-            int length = in.readInt();
-            if (length < 3) { // trg, srcs.., timestam
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String trg = FSImageSerialization.readString(in);
-            int srcSize = length - 1 - 1; //trg and timestamp
-            String [] srcs = new String [srcSize];
-            for(int i=0; i<srcSize;i++) {
-              srcs[i]= FSImageSerialization.readString(in);
-            }
-            timestamp = readLong(in);
-            fsDir.unprotectedConcat(trg, srcs);
+
+            ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
+            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs);
             break;
           }
           case OP_RENAME_OLD: {
             numOpRenameOld++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String s = FSImageSerialization.readString(in);
-            String d = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-            fsDir.unprotectedRenameTo(s, d, timestamp);
-            fsNamesys.changeLease(s, d, dinfo);
+            RenameOldOp renameOp = (RenameOldOp)op;
+            HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+            fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+                                      renameOp.timestamp);
+            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_DELETE: {
             numOpDelete++;
-            int length = in.readInt();
-            if (length != 2) {
-              throw new IOException("Incorrect data format. " 
-                                    + "delete operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-            fsDir.unprotectedDelete(path, timestamp);
+
+            DeleteOp deleteOp = (DeleteOp)op;
+            fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
             break;
           }
           case OP_MKDIR: {
             numOpMkDir++;
+            MkdirOp mkdirOp = (MkdirOp)op;
             PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            int length = in.readInt();
-            if (-17 < logVersion && length != 2 ||
-                logVersion <= -17 && length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-  
-            // The disk format stores atimes for directories as well.
-            // However, currently this is not being updated/used because of
-            // performance reasons.
-            if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-              atime = readLong(in);
-            }
-  
-            if (logVersion <= -11) {
-              permissions = PermissionStatus.read(in);
+            if (mkdirOp.permissions != null) {
+              permissions = mkdirOp.permissions;
             }
-            fsDir.unprotectedMkdir(path, permissions, timestamp);
+
+            fsDir.unprotectedMkdir(mkdirOp.path, permissions,
+                                   mkdirOp.timestamp);
             break;
           }
           case OP_SET_GENSTAMP: {
             numOpSetGenStamp++;
-            long lw = in.readLong();
-            fsNamesys.setGenerationStamp(lw);
-            break;
-          } 
-          case OP_DATANODE_ADD: {
-            numOpOther++;
-            //Datanodes are not persistent any more.
-            FSImageSerialization.DatanodeImage.skipOne(in);
-            break;
-          }
-          case OP_DATANODE_REMOVE: {
-            numOpOther++;
-            DatanodeID nodeID = new DatanodeID();
-            nodeID.readFields(in);
-            //Datanodes are not persistent any more.
+            SetGenstampOp setGenstampOp = (SetGenstampOp)op;
+            fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
             break;
           }
           case OP_SET_PERMISSIONS: {
             numOpSetPerm++;
-            fsDir.unprotectedSetPermission(
-                FSImageSerialization.readString(in), FsPermission.read(in));
+
+            SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
+            fsDir.unprotectedSetPermission(setPermissionsOp.src,
+                                           setPermissionsOp.permissions);
             break;
           }
           case OP_SET_OWNER: {
             numOpSetOwner++;
-            if (logVersion > -11)
-              throw new IOException("Unexpected opCode " + opCode
-                                    + " for version " + logVersion);
-            fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
-                FSImageSerialization.readString_EmptyAsNull(in),
-                FSImageSerialization.readString_EmptyAsNull(in));
+
+            SetOwnerOp setOwnerOp = (SetOwnerOp)op;
+            fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
+                                      setOwnerOp.groupname);
             break;
           }
           case OP_SET_NS_QUOTA: {
-            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
-                                      readLongWritable(in), 
+            SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
+            fsDir.unprotectedSetQuota(setNSQuotaOp.src,
+                                      setNSQuotaOp.nsQuota,
                                       FSConstants.QUOTA_DONT_SET);
             break;
           }
           case OP_CLEAR_NS_QUOTA: {
-            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+            ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
+            fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
                                       FSConstants.QUOTA_RESET,
                                       FSConstants.QUOTA_DONT_SET);
             break;
           }
-  
+
           case OP_SET_QUOTA:
-            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
-                                      readLongWritable(in),
-                                      readLongWritable(in));
-                                        
+            SetQuotaOp setQuotaOp = (SetQuotaOp)op;
+            fsDir.unprotectedSetQuota(setQuotaOp.src,
+                                      setQuotaOp.nsQuota,
+                                      setQuotaOp.dsQuota);
             break;
-  
+
           case OP_TIMES: {
             numOpTimes++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "times operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            mtime = readLong(in);
-            atime = readLong(in);
-            fsDir.unprotectedSetTimes(path, mtime, atime, true);
+            TimesOp timesOp = (TimesOp)op;
+
+            fsDir.unprotectedSetTimes(timesOp.path,
+                                      timesOp.mtime,
+                                      timesOp.atime, true);
             break;
           }
           case OP_SYMLINK: {
             numOpSymlink++;
-            int length = in.readInt();
-            if (length != 4) {
-              throw new IOException("Incorrect data format. " 
-                                    + "symlink operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            String value = FSImageSerialization.readString(in);
-            mtime = readLong(in);
-            atime = readLong(in);
-            PermissionStatus perm = PermissionStatus.read(in);
-            fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+
+            SymlinkOp symlinkOp = (SymlinkOp)op;
+            fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
+                                     symlinkOp.mtime, symlinkOp.atime,
+                                     symlinkOp.permissionStatus);
             break;
           }
           case OP_RENAME: {
             numOpRename++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String s = FSImageSerialization.readString(in);
-            String d = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-            Rename[] options = readRenameOptions(in);
-            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-            fsDir.unprotectedRenameTo(s, d, timestamp, options);
-            fsNamesys.changeLease(s, d, dinfo);
+            RenameOp renameOp = (RenameOp)op;
+
+            HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+            fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+                                      renameOp.timestamp, renameOp.options);
+            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_GET_DELEGATION_TOKEN: {
             numOpGetDelegationToken++;
-            DelegationTokenIdentifier delegationTokenId = 
-                new DelegationTokenIdentifier();
-            delegationTokenId.readFields(in);
-            long expiryTime = readLong(in);
+            GetDelegationTokenOp getDelegationTokenOp
+              = (GetDelegationTokenOp)op;
+
             fsNamesys.getDelegationTokenSecretManager()
-                .addPersistedDelegationToken(delegationTokenId, expiryTime);
+              .addPersistedDelegationToken(getDelegationTokenOp.token,
+                                           getDelegationTokenOp.expiryTime);
             break;
           }
           case OP_RENEW_DELEGATION_TOKEN: {
             numOpRenewDelegationToken++;
-            DelegationTokenIdentifier delegationTokenId = 
-                new DelegationTokenIdentifier();
-            delegationTokenId.readFields(in);
-            long expiryTime = readLong(in);
+
+            RenewDelegationTokenOp renewDelegationTokenOp
+              = (RenewDelegationTokenOp)op;
             fsNamesys.getDelegationTokenSecretManager()
-                .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+              .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
+                                           renewDelegationTokenOp.expiryTime);
             break;
           }
           case OP_CANCEL_DELEGATION_TOKEN: {
             numOpCancelDelegationToken++;
-            DelegationTokenIdentifier delegationTokenId = 
-                new DelegationTokenIdentifier();
-            delegationTokenId.readFields(in);
+
+            CancelDelegationTokenOp cancelDelegationTokenOp
+              = (CancelDelegationTokenOp)op;
             fsNamesys.getDelegationTokenSecretManager()
-                .updatePersistedTokenCancellation(delegationTokenId);
+                .updatePersistedTokenCancellation(
+                    cancelDelegationTokenOp.token);
             break;
           }
           case OP_UPDATE_MASTER_KEY: {
             numOpUpdateMasterKey++;
-            DelegationKey delegationKey = new DelegationKey();
-            delegationKey.readFields(in);
-            fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
-                delegationKey);
+            UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
+            fsNamesys.getDelegationTokenSecretManager()
+              .updatePersistedMasterKey(updateMasterKeyOp.key);
             break;
           }
           case OP_REASSIGN_LEASE: {
             numOpReassignLease++;
-            String leaseHolder = FSImageSerialization.readString(in);
-            path = FSImageSerialization.readString(in);
-            String newHolder = FSImageSerialization.readString(in);
-            Lease lease = fsNamesys.leaseManager.getLease(leaseHolder);
+            ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
+
+            Lease lease = fsNamesys.leaseManager.getLease(
+                reassignLeaseOp.leaseHolder);
             INodeFileUnderConstruction pendingFile =
-                (INodeFileUnderConstruction) fsDir.getFileINode(path);
-            fsNamesys.reassignLeaseInternal(lease, path, newHolder, pendingFile);
+                (INodeFileUnderConstruction) fsDir.getFileINode(
+                    reassignLeaseOp.path);
+            fsNamesys.reassignLeaseInternal(lease,
+                reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
             break;
           }
-          default: {
-            throw new IOException("Never seen opCode " + opCode);
-          }
+          case OP_DATANODE_ADD:
+          case OP_DATANODE_REMOVE:
+            numOpOther++;
+            break;
+          default:
+            throw new IOException("Invalid operation read " + op.opCode);
           }
-          validateChecksum(in, checksum, numEdits);
         }
       } catch (IOException ex) {
         check203UpgradeFailure(logVersion, ex);
@@ -547,126 +445,6 @@ public class FSEditLogLoader {
   }
 
   /**
-   * Validate a transaction's checksum
-   */
-  private static void validateChecksum(
-      DataInputStream in, Checksum checksum, int tid)
-  throws IOException {
-    if (checksum != null) {
-      int calculatedChecksum = (int)checksum.getValue();
-      int readChecksum = in.readInt(); // read in checksum
-      if (readChecksum != calculatedChecksum) {
-        throw new ChecksumException(
-            "Transaction " + tid + " is corrupt. Calculated checksum is " +
-            calculatedChecksum + " but read checksum " + readChecksum, tid);
-      }
-    }
-  }
-
-  /**
-   * A class to read in blocks stored in the old format. The only two
-   * fields in the block were blockid and length.
-   */
-  static class BlockTwo implements Writable {
-    long blkid;
-    long len;
-
-    static {                                      // register a ctor
-      WritableFactories.setFactory
-        (BlockTwo.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new BlockTwo(); }
-         });
-    }
-
-
-    BlockTwo() {
-      blkid = 0;
-      len = 0;
-    }
-    /////////////////////////////////////
-    // Writable
-    /////////////////////////////////////
-    public void write(DataOutput out) throws IOException {
-      out.writeLong(blkid);
-      out.writeLong(len);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      this.blkid = in.readLong();
-      this.len = in.readLong();
-    }
-  }
-
-  /** This method is defined for compatibility reason. */
-  static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
-      ) throws IOException {
-    DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
-    for (int i = 0; i < locations.length; i++) {
-      locations[i] = new DatanodeDescriptor();
-      locations[i].readFieldsFromFSEditLog(in);
-    }
-    return locations;
-  }
-
-  static private short readShort(DataInputStream in) throws IOException {
-    return Short.parseShort(FSImageSerialization.readString(in));
-  }
-
-  static private long readLong(DataInputStream in) throws IOException {
-    return Long.parseLong(FSImageSerialization.readString(in));
-  }
-  
-  // a place holder for reading a long
-  private static final LongWritable longWritable = new LongWritable();
-
-  /** Read an integer from an input stream */
-  private static long readLongWritable(DataInputStream in) throws IOException {
-    synchronized (longWritable) {
-      longWritable.readFields(in);
-      return longWritable.get();
-    }
-  }
-  
-  static Rename[] readRenameOptions(DataInputStream in) throws IOException {
-    BytesWritable writable = new BytesWritable();
-    writable.readFields(in);
-    
-    byte[] bytes = writable.getBytes();
-    Rename[] options = new Rename[bytes.length];
-    
-    for (int i = 0; i < bytes.length; i++) {
-      options[i] = Rename.valueOf(bytes[i]);
-    }
-    return options;
-  }
-
-  static private BlockInfo[] readBlocks(
-      DataInputStream in,
-      int logVersion,
-      boolean isFileUnderConstruction,
-      short replication) throws IOException {
-    int numBlocks = in.readInt();
-    BlockInfo[] blocks = new BlockInfo[numBlocks];
-    Block blk = new Block();
-    BlockTwo oldblk = new BlockTwo();
-    for (int i = 0; i < numBlocks; i++) {
-      if (logVersion <= -14) {
-        blk.readFields(in);
-      } else {
-        oldblk.readFields(in);
-        blk.set(oldblk.blkid, oldblk.len,
-                GenerationStamp.GRANDFATHER_GENERATION_STAMP);
-      }
-      if(isFileUnderConstruction && i == numBlocks-1)
-        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
-      else
-        blocks[i] = new BlockInfo(blk, replication);
-    }
-    return blocks;
-  }
-  
-  /** 
    * Throw appropriate exception during upgrade from 203, when editlog loading
    * could fail due to opcode conflicts.
    */

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1134031&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Jun  9 18:36:16 2011
@@ -0,0 +1,739 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.zip.Checksum;
+import java.util.EnumMap;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+
+/**
+ * Helper classes for reading the ops from an InputStream.
+ * All ops derive from FSEditLogOp and are only
+ * instantiated from Reader#readOp()
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class FSEditLogOp {
+  final FSEditLogOpCodes opCode;
+
+  /**
+   * Constructor for an EditLog Op. EditLog ops cannot be constructed
+   * directly, but only through Reader#readOp.
+   */
+  private FSEditLogOp(FSEditLogOpCodes opCode) {
+    this.opCode = opCode;
+  }
+
+  public abstract void readFields(DataInputStream in, int logVersion)
+      throws IOException;
+
+  static class AddCloseOp extends FSEditLogOp {
+    int length;
+    String path;
+    short replication;
+    long mtime;
+    long atime;
+    long blockSize;
+    Block[] blocks;
+    PermissionStatus permissions;
+    String clientName;
+    String clientMachine;
+    //final DatanodeDescriptor[] dataNodeDescriptors; UNUSED
+
+    private AddCloseOp(FSEditLogOpCodes opCode) {
+      super(opCode);
+      assert(opCode == OP_ADD || opCode == OP_CLOSE);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      // versions > 0 support per file replication
+      // get name and replication
+      this.length = in.readInt();
+      if (-7 == logVersion && length != 3||
+          -17 < logVersion && logVersion < -7 && length != 4 ||
+          logVersion <= -17 && length != 5) {
+        throw new IOException("Incorrect data format."  +
+                              " logVersion is " + logVersion +
+                              " but writables.length is " +
+                              length + ". ");
+      }
+      this.path = FSImageSerialization.readString(in);
+      this.replication = readShort(in);
+      this.mtime = readLong(in);
+      if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
+        this.atime = readLong(in);
+      } else {
+        this.atime = 0;
+      }
+      if (logVersion < -7) {
+        this.blockSize = readLong(in);
+      } else {
+        this.blockSize = 0;
+      }
+
+      // get blocks
+      this.blocks = readBlocks(in, logVersion);
+
+      if (logVersion <= -11) {
+        this.permissions = PermissionStatus.read(in);
+      } else {
+        this.permissions = null;
+      }
+
+      // clientname, clientMachine and block locations of last block.
+      if (this.opCode == OP_ADD && logVersion <= -12) {
+        this.clientName = FSImageSerialization.readString(in);
+        this.clientMachine = FSImageSerialization.readString(in);
+        if (-13 <= logVersion) {
+          readDatanodeDescriptorArray(in);
+        }
+      } else {
+        this.clientName = "";
+        this.clientMachine = "";
+      }
+    }
+
+    /** This method is defined for compatibility reason. */
+    private static DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in)
+        throws IOException {
+      DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
+        for (int i = 0; i < locations.length; i++) {
+          locations[i] = new DatanodeDescriptor();
+          locations[i].readFieldsFromFSEditLog(in);
+        }
+        return locations;
+    }
+
+    private static Block[] readBlocks(
+        DataInputStream in,
+        int logVersion) throws IOException {
+      int numBlocks = in.readInt();
+      Block[] blocks = new Block[numBlocks];
+      for (int i = 0; i < numBlocks; i++) {
+        Block blk = new Block();
+        if (logVersion <= -14) {
+          blk.readFields(in);
+        } else {
+          BlockTwo oldblk = new BlockTwo();
+          oldblk.readFields(in);
+          blk.set(oldblk.blkid, oldblk.len,
+                  GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+        }
+        blocks[i] = blk;
+      }
+      return blocks;
+    }
+  }
+
+  static class SetReplicationOp extends FSEditLogOp {
+    String path;
+    short replication;
+
+    private SetReplicationOp() {
+      super(OP_SET_REPLICATION);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.path = FSImageSerialization.readString(in);
+      this.replication = readShort(in);
+    }
+  }
+
+  static class ConcatDeleteOp extends FSEditLogOp {
+    int length;
+    String trg;
+    String[] srcs;
+    long timestamp;
+
+    private ConcatDeleteOp() {
+      super(OP_CONCAT_DELETE);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.length = in.readInt();
+      if (length < 3) { // trg, srcs.., timestam
+        throw new IOException("Incorrect data format. "
+                              + "Concat delete operation.");
+      }
+      this.trg = FSImageSerialization.readString(in);
+      int srcSize = this.length - 1 - 1; //trg and timestamp
+      this.srcs = new String [srcSize];
+      for(int i=0; i<srcSize;i++) {
+        srcs[i]= FSImageSerialization.readString(in);
+      }
+      this.timestamp = readLong(in);
+    }
+  }
+
+  static class RenameOldOp extends FSEditLogOp {
+    int length;
+    String src;
+    String dst;
+    long timestamp;
+
+    private RenameOldOp() {
+      super(OP_RENAME_OLD);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.length = in.readInt();
+      if (this.length != 3) {
+        throw new IOException("Incorrect data format. "
+                              + "Old rename operation.");
+      }
+      this.src = FSImageSerialization.readString(in);
+      this.dst = FSImageSerialization.readString(in);
+      this.timestamp = readLong(in);
+    }
+  }
+
+  static class DeleteOp extends FSEditLogOp {
+    int length;
+    String path;
+    long timestamp;
+
+    private DeleteOp() {
+      super(OP_DELETE);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+
+      this.length = in.readInt();
+      if (this.length != 2) {
+        throw new IOException("Incorrect data format. "
+                              + "delete operation.");
+      }
+      this.path = FSImageSerialization.readString(in);
+      this.timestamp = readLong(in);
+    }
+  }
+
+  static class MkdirOp extends FSEditLogOp {
+    int length;
+    String path;
+    long timestamp;
+    long atime;
+    PermissionStatus permissions;
+
+    private MkdirOp() {
+      super(OP_MKDIR);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+
+      this.length = in.readInt();
+      if (-17 < logVersion && length != 2 ||
+          logVersion <= -17 && length != 3) {
+        throw new IOException("Incorrect data format. "
+                              + "Mkdir operation.");
+      }
+      this.path = FSImageSerialization.readString(in);
+      this.timestamp = readLong(in);
+
+      // The disk format stores atimes for directories as well.
+      // However, currently this is not being updated/used because of
+      // performance reasons.
+      if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
+        this.atime = readLong(in);
+      } else {
+        this.atime = 0;
+      }
+
+      if (logVersion <= -11) {
+        this.permissions = PermissionStatus.read(in);
+      } else {
+        this.permissions = null;
+      }
+    }
+  }
+
+  static class SetGenstampOp extends FSEditLogOp {
+    long genStamp;
+
+    private SetGenstampOp() {
+      super(OP_SET_GENSTAMP);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.genStamp = in.readLong();
+    }
+  }
+
+  static class DatanodeAddOp extends FSEditLogOp {
+    @SuppressWarnings("deprecation")
+    private DatanodeAddOp() {
+      super(OP_DATANODE_ADD);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      //Datanodes are not persistent any more.
+      FSImageSerialization.DatanodeImage.skipOne(in);
+    }
+  }
+
+  static class DatanodeRemoveOp extends FSEditLogOp {
+    @SuppressWarnings("deprecation")
+    private DatanodeRemoveOp() {
+      super(OP_DATANODE_REMOVE);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      DatanodeID nodeID = new DatanodeID();
+      nodeID.readFields(in);
+      //Datanodes are not persistent any more.
+    }
+  }
+
+  static class SetPermissionsOp extends FSEditLogOp {
+    String src;
+    FsPermission permissions;
+
+    private SetPermissionsOp() {
+      super(OP_SET_PERMISSIONS);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.src = FSImageSerialization.readString(in);
+      this.permissions = FsPermission.read(in);
+    }
+  }
+
+  static class SetOwnerOp extends FSEditLogOp {
+    String src;
+    String username;
+    String groupname;
+
+    private SetOwnerOp() {
+      super(OP_SET_OWNER);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.src = FSImageSerialization.readString(in);
+      this.username = FSImageSerialization.readString_EmptyAsNull(in);
+      this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
+    }
+
+  }
+
+  static class SetNSQuotaOp extends FSEditLogOp {
+    String src;
+    long nsQuota;
+
+    private SetNSQuotaOp() {
+      super(OP_SET_NS_QUOTA);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.src = FSImageSerialization.readString(in);
+      this.nsQuota = readLongWritable(in);
+    }
+  }
+
+  static class ClearNSQuotaOp extends FSEditLogOp {
+    String src;
+
+    private ClearNSQuotaOp() {
+      super(OP_CLEAR_NS_QUOTA);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.src = FSImageSerialization.readString(in);
+    }
+  }
+
+  static class SetQuotaOp extends FSEditLogOp {
+    String src;
+    long nsQuota;
+    long dsQuota;
+
+    private SetQuotaOp() {
+      super(OP_SET_QUOTA);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.src = FSImageSerialization.readString(in);
+      this.nsQuota = readLongWritable(in);
+      this.dsQuota = readLongWritable(in);
+    }
+  }
+
+  static class TimesOp extends FSEditLogOp {
+    int length;
+    String path;
+    long mtime;
+    long atime;
+
+    private TimesOp() {
+      super(OP_TIMES);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.length = in.readInt();
+      if (length != 3) {
+        throw new IOException("Incorrect data format. "
+                              + "times operation.");
+      }
+      this.path = FSImageSerialization.readString(in);
+      this.mtime = readLong(in);
+      this.atime = readLong(in);
+    }
+  }
+
+  static class SymlinkOp extends FSEditLogOp {
+    int length;
+    String path;
+    String value;
+    long mtime;
+    long atime;
+    PermissionStatus permissionStatus;
+
+    private SymlinkOp() {
+      super(OP_SYMLINK);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+
+      this.length = in.readInt();
+      if (this.length != 4) {
+        throw new IOException("Incorrect data format. "
+                              + "symlink operation.");
+      }
+      this.path = FSImageSerialization.readString(in);
+      this.value = FSImageSerialization.readString(in);
+      this.mtime = readLong(in);
+      this.atime = readLong(in);
+      this.permissionStatus = PermissionStatus.read(in);
+    }
+  }
+
+  static class RenameOp extends FSEditLogOp {
+    int length;
+    String src;
+    String dst;
+    long timestamp;
+    Rename[] options;
+
+    private RenameOp() {
+      super(OP_RENAME);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.length = in.readInt();
+      if (this.length != 3) {
+        throw new IOException("Incorrect data format. "
+                              + "Rename operation.");
+      }
+      this.src = FSImageSerialization.readString(in);
+      this.dst = FSImageSerialization.readString(in);
+      this.timestamp = readLong(in);
+      this.options = readRenameOptions(in);
+    }
+
+    private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
+      BytesWritable writable = new BytesWritable();
+      writable.readFields(in);
+
+      byte[] bytes = writable.getBytes();
+      Rename[] options = new Rename[bytes.length];
+
+      for (int i = 0; i < bytes.length; i++) {
+        options[i] = Rename.valueOf(bytes[i]);
+      }
+      return options;
+    }
+  }
+
+  static class ReassignLeaseOp extends FSEditLogOp {
+    String leaseHolder;
+    String path;
+    String newHolder;
+
+    private ReassignLeaseOp() {
+      super(OP_REASSIGN_LEASE);
+    }
+    
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.leaseHolder = FSImageSerialization.readString(in);
+      this.path = FSImageSerialization.readString(in);
+      this.newHolder = FSImageSerialization.readString(in);
+    }
+  }
+
+  static class GetDelegationTokenOp extends FSEditLogOp {
+    DelegationTokenIdentifier token;
+    long expiryTime;
+
+    private GetDelegationTokenOp() {
+      super(OP_GET_DELEGATION_TOKEN);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.token = new DelegationTokenIdentifier();
+      this.token.readFields(in);
+      this.expiryTime = readLong(in);
+    }
+  }
+
+  static class RenewDelegationTokenOp extends FSEditLogOp {
+    DelegationTokenIdentifier token;
+    long expiryTime;
+
+    private RenewDelegationTokenOp() {
+      super(OP_RENEW_DELEGATION_TOKEN);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.token = new DelegationTokenIdentifier();
+      this.token.readFields(in);
+      this.expiryTime = readLong(in);
+    }
+  }
+
+  static class CancelDelegationTokenOp extends FSEditLogOp {
+    DelegationTokenIdentifier token;
+
+    private CancelDelegationTokenOp() {
+      super(OP_CANCEL_DELEGATION_TOKEN);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.token = new DelegationTokenIdentifier();
+      this.token.readFields(in);
+    }
+  }
+
+  static class UpdateMasterKeyOp extends FSEditLogOp {
+    DelegationKey key;
+
+    private UpdateMasterKeyOp() {
+      super(OP_UPDATE_MASTER_KEY);
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.key = new DelegationKey();
+      this.key.readFields(in);
+    }
+  }
+  
+  static private short readShort(DataInputStream in) throws IOException {
+    return Short.parseShort(FSImageSerialization.readString(in));
+  }
+
+  static private long readLong(DataInputStream in) throws IOException {
+    return Long.parseLong(FSImageSerialization.readString(in));
+  }
+
+  /**
+   * A class to read in blocks stored in the old format. The only two
+   * fields in the block were blockid and length.
+   */
+  static class BlockTwo implements Writable {
+    long blkid;
+    long len;
+
+    static {                                      // register a ctor
+      WritableFactories.setFactory
+        (BlockTwo.class,
+         new WritableFactory() {
+           public Writable newInstance() { return new BlockTwo(); }
+         });
+    }
+
+
+    BlockTwo() {
+      blkid = 0;
+      len = 0;
+    }
+    /////////////////////////////////////
+    // Writable
+    /////////////////////////////////////
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(blkid);
+      out.writeLong(len);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      this.blkid = in.readLong();
+      this.len = in.readLong();
+    }
+  }
+
+    // a place holder for reading a long
+  private static final LongWritable longWritable = new LongWritable();
+
+  /** Read an integer from an input stream */
+  private static long readLongWritable(DataInputStream in) throws IOException {
+    synchronized (longWritable) {
+      longWritable.readFields(in);
+      return longWritable.get();
+    }
+  }
+
+  /**
+   * Class for reading editlog ops from a stream
+   */
+  public static class Reader {
+    private final DataInputStream in;
+    private final int logVersion;
+    private final Checksum checksum;
+    private EnumMap<FSEditLogOpCodes, FSEditLogOp> opInstances;
+    /**
+     * Construct the reader
+     * @param in The stream to read from.
+     * @param logVersion The version of the data coming from the stream.
+     * @param checksum Checksum being used with input stream.
+     */
+    @SuppressWarnings("deprecation")
+    public Reader(DataInputStream in, int logVersion,
+                  Checksum checksum) {
+      this.in = in;
+      this.logVersion = logVersion;
+      this.checksum = checksum;
+      opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
+          FSEditLogOpCodes.class);
+      opInstances.put(OP_ADD, new AddCloseOp(OP_ADD));
+      opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE));
+      opInstances.put(OP_SET_REPLICATION, new SetReplicationOp());
+      opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+      opInstances.put(OP_RENAME_OLD, new RenameOldOp());
+      opInstances.put(OP_DELETE, new DeleteOp());
+      opInstances.put(OP_MKDIR, new MkdirOp());
+      opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp());
+      opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp());
+      opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
+      opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+      opInstances.put(OP_SET_OWNER, new SetOwnerOp());
+      opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+      opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+      opInstances.put(OP_SET_QUOTA, new SetQuotaOp());
+      opInstances.put(OP_TIMES, new TimesOp());
+      opInstances.put(OP_SYMLINK, new SymlinkOp());
+      opInstances.put(OP_RENAME, new RenameOp());
+      opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+      opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+      opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+      opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
+                      new CancelDelegationTokenOp());
+      opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+    }
+
+    /**
+     * Read an operation from the input stream.
+     * 
+     * Note that the objects returned from this method may be re-used by future
+     * calls to the same method.
+     * 
+     * @return the operation read from the stream, or null at the end of the file
+     * @throws IOException on error.
+     */
+    public FSEditLogOp readOp() throws IOException {
+      if (checksum != null) {
+        checksum.reset();
+      }
+
+      in.mark(1);
+
+      byte opCodeByte;
+      try {
+        opCodeByte = in.readByte();
+      } catch (EOFException eof) {
+        // EOF at an opcode boundary is expected.
+        return null;
+      }
+
+      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+      if (opCode == OP_INVALID) {
+        in.reset(); // reset back to end of file if somebody reads it again
+        return null;
+      }
+
+      FSEditLogOp op = opInstances.get(opCode);
+      if (op == null) {
+        throw new IOException("Read invalid opcode " + opCode);
+      }
+      op.readFields(in, logVersion);
+
+      validateChecksum(in, checksum);
+      return op;
+    }
+
+    /**
+     * Validate a transaction's checksum
+     */
+    private void validateChecksum(DataInputStream in,
+                                  Checksum checksum)
+        throws IOException {
+      if (checksum != null) {
+        int calculatedChecksum = (int)checksum.getValue();
+        int readChecksum = in.readInt(); // read in checksum
+        if (readChecksum != calculatedChecksum) {
+          throw new ChecksumException(
+              "Transaction is corrupt. Calculated checksum is " +
+              calculatedChecksum + " but read checksum " + readChecksum, -1);
+        }
+      }
+    }
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1134031&r1=1134030&r2=1134031&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Thu Jun  9 18:36:16 2011
@@ -24,8 +24,11 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.Test;
@@ -75,4 +78,50 @@ public class TestFSEditLogLoader {
           e.getMessage().matches(expectedErrorMessage));
     }
   }
+  
+  /**
+   * Test that, if the NN restarts with a new minimum replication,
+   * any files created with the old replication count will get
+   * automatically bumped up to the new minimum upon restart.
+   */
+  @Test
+  public void testReplicationAdjusted() throws IOException {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    // Replicate and heartbeat fast to shave a few seconds off test
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+  
+      // Create a file with replication count 1
+      Path p = new Path("/testfile");
+      DFSTestUtil.createFile(fs, p, 10, /*repl*/ (short)1, 1);
+      DFSTestUtil.waitReplication(fs, p, (short)1);
+  
+      // Shut down and restart cluster with new minimum replication of 2
+      cluster.shutdown();
+      cluster = null;
+      
+      conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
+  
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(false).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      
+      // The file should get adjusted to replication 2 when
+      // the edit log is replayed.
+      DFSTestUtil.waitReplication(fs, p, (short)2);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }