You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2010/10/29 19:04:08 UTC

svn commit: r1028847 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: eli
Date: Fri Oct 29 17:04:07 2010
New Revision: 1028847

URL: http://svn.apache.org/viewvc?rev=1028847&view=rev
Log:
HDFS-1462. Refactor edit log loading to a separate class from edit log writing. Contributed by Todd Lipcon.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Oct 29 17:04:07 2010
@@ -157,6 +157,9 @@ Trunk (unreleased changes)
     HDFS-259. Remove intentionally corrupt 0.13 directory layout creation.
     (Todd Lipcon via eli)
 
+    HDFS-1462. Refactor edit log loading to a separate class from edit log writing.
+    (Todd Lipcon via eli)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Fri Oct 29 17:04:07 2010
@@ -43,6 +43,7 @@ public class BackupStorage extends FSIma
 
   /** Backup input stream for loading edits into memory */
   private EditLogBackupInputStream backupInputStream;
+
   /** Is journal spooling in progress */
   volatile JSpoolState jsState;
 
@@ -214,7 +215,8 @@ public class BackupStorage extends FSIma
           waitSpoolEnd();
           // update NameSpace in memory
           backupInputStream.setBytes(data);
-          editLog.loadEditRecords(getLayoutVersion(),
+          FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+          logLoader.loadEditRecords(getLayoutVersion(),
                     backupInputStream.getDataInputStream(), true);
           getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
           break;
@@ -334,11 +336,12 @@ public class BackupStorage extends FSIma
       // load edits.new
       EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
       DataInputStream in = edits.getDataInputStream();
-      numEdits += editLog.loadFSEdits(in, false);
-  
+      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+      numEdits += logLoader.loadFSEdits(in, false);
+
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      numEdits += editLog.loadEditRecords(getLayoutVersion(), in, true);
+      numEdits += logLoader.loadEditRecords(getLayoutVersion(), in, true);
       getFSNamesystem().dir.updateCountForINodeWithQuota();
       edits.close();
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Oct 29 17:04:07 2010
@@ -147,7 +147,7 @@ class EditLogBackupOutputStream extends 
       JournalRecord jRec = null;
       for(; idx < bufReadySize; idx++) {
         jRec = bufReady.get(idx);
-        if(jRec.op >= FSEditLog.OP_JSPOOL_START)
+        if(jRec.op >= FSEditLog.Ops.OP_JSPOOL_START)
           break;  // special operation should be sent in a separate call to BN
         jRec.write(out);
       }
@@ -177,7 +177,7 @@ class EditLogBackupOutputStream extends 
   private void send(int ja) throws IOException {
     try {
       int length = out.getLength();
-      out.write(FSEditLog.OP_INVALID);
+      out.write(FSEditLog.Ops.OP_INVALID);
       backupNode.journal(nnRegistration, ja, length, out.getData());
     } finally {
       out.reset();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Oct 29 17:04:07 2010
@@ -128,7 +128,7 @@ class EditLogFileOutputStream extends Ed
   @Override
   void setReadyToFlush() throws IOException {
     assert bufReady.size() == 0 : "previous data is not flushed yet";
-    write(FSEditLog.OP_INVALID); // insert end-of-file marker
+    write(FSEditLog.Ops.OP_INVALID); // insert end-of-file marker
     DataOutputBuffer tmp = bufReady;
     bufReady = bufCurrent;
     bufCurrent = tmp;
@@ -189,7 +189,7 @@ class EditLogFileOutputStream extends Ed
    */
   @Override
   boolean isOperationSupported(byte op) {
-    return op < FSEditLog.OP_JSPOOL_START - 1;
+    return op < FSEditLog.Ops.OP_JSPOOL_START - 1;
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 29 17:04:07 2010
@@ -17,28 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -53,8 +45,6 @@ import org.apache.hadoop.io.ArrayWritabl
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 /**
@@ -64,45 +54,50 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class FSEditLog {
-  public  static final byte OP_INVALID = -1;
-  private static final byte OP_ADD = 0;
-  private static final byte OP_RENAME_OLD = 1;  // rename
-  private static final byte OP_DELETE = 2;  // delete
-  private static final byte OP_MKDIR = 3;   // create directory
-  private static final byte OP_SET_REPLICATION = 4; // set replication
-  //the following two are used only for backward compatibility :
-  @Deprecated private static final byte OP_DATANODE_ADD = 5;
-  @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
-  private static final byte OP_SET_PERMISSIONS = 7;
-  private static final byte OP_SET_OWNER = 8;
-  private static final byte OP_CLOSE = 9;    // close after write
-  private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
-  /* The following two are not used any more. Should be removed once
-   * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
-  private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
-  private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
-  private static final byte OP_TIMES = 13; // sets mod & access time on a file
-  private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
-  private static final byte OP_RENAME = 15;  // new rename
-  private static final byte OP_CONCAT_DELETE = 16; // concat files.
-  private static final byte OP_SYMLINK = 17; // a symbolic link
-  private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
-  private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
-  private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
-  private static final byte OP_UPDATE_MASTER_KEY = 21; //update master key
-
-  /* 
-   * The following operations are used to control remote edit log streams,
-   * and not logged into file streams.
-   */
-  static final byte OP_JSPOOL_START = // start journal spool
-                                    NamenodeProtocol.JA_JSPOOL_START;
-  static final byte OP_CHECKPOINT_TIME = // incr checkpoint time
-                                    NamenodeProtocol.JA_CHECKPOINT_TIME;
+  
+  abstract static class Ops {
+    public  static final byte OP_INVALID = -1;
+    public static final byte OP_ADD = 0;
+    public static final byte OP_RENAME_OLD = 1;  // rename
+    public static final byte OP_DELETE = 2;  // delete
+    public static final byte OP_MKDIR = 3;   // create directory
+    public static final byte OP_SET_REPLICATION = 4; // set replication
+    //the following two are used only for backward compatibility :
+    @Deprecated public static final byte OP_DATANODE_ADD = 5;
+    @Deprecated public static final byte OP_DATANODE_REMOVE = 6;
+    public static final byte OP_SET_PERMISSIONS = 7;
+    public static final byte OP_SET_OWNER = 8;
+    public static final byte OP_CLOSE = 9;    // close after write
+    public static final byte OP_SET_GENSTAMP = 10;    // store genstamp
+    /* The following two are not used any more. Should be removed once
+     * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
+    public static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
+    public static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
+    public static final byte OP_TIMES = 13; // sets mod & access time on a file
+    public static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+    public static final byte OP_RENAME = 15;  // new rename
+    public static final byte OP_CONCAT_DELETE = 16; // concat files.
+    public static final byte OP_SYMLINK = 17; // a symbolic link
+    public static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
+    public static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
+    public static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
+    public static final byte OP_UPDATE_MASTER_KEY = 21; //update master key
+  
+    /* 
+     * The following operations are used to control remote edit log streams,
+     * and not logged into file streams.
+     */
+    static final byte OP_JSPOOL_START = // start journal spool
+                                      NamenodeProtocol.JA_JSPOOL_START;
+    static final byte OP_CHECKPOINT_TIME = // incr checkpoint time
+                                      NamenodeProtocol.JA_CHECKPOINT_TIME;
+  }
 
   static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
       " File system changes are not persistent. No journal streams.";
 
+  private static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
   private volatile int sizeOutputFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -198,7 +193,7 @@ public class FSEditLog {
       try {
         addNewEditLogStream(eFile);
       } catch (IOException e) {
-        FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
+        LOG.warn("Unable to open edit log file " + eFile);
         // Remove the directory from list of storage directories
         if(al == null) al = new ArrayList<StorageDirectory>(1);
         al.add(sd);
@@ -243,7 +238,7 @@ public class FSEditLog {
       try {
         closeStream(eStream);
       } catch (IOException e) {
-        FSNamesystem.LOG.warn("FSEditLog:close - failed to close stream " 
+        LOG.warn("FSEditLog:close - failed to close stream " 
             + eStream.getName());
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
@@ -281,17 +276,17 @@ public class FSEditLog {
     }
 
     String lsd = fsimage.listStorageDirectories();
-    FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
+    LOG.info("current list of storage dirs:" + lsd);
 
     ArrayList<StorageDirectory> al = null;
     for (EditLogOutputStream eStream : errorStreams) {
-      FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName()
+      LOG.error("Unable to log edits to " + eStream.getName()
           + "; removing it");     
 
       StorageDirectory storageDir;
       if(propagate && eStream.getType() == JournalType.FILE && //find SD
           (storageDir = getStorage(eStream)) != null) {
-        FSNamesystem.LOG.info("about to remove corresponding storage:" 
+        LOG.info("about to remove corresponding storage:" 
             + storageDir.getRoot().getAbsolutePath());
         // remove corresponding storage dir
         if(al == null) al = new ArrayList<StorageDirectory>(1);
@@ -303,7 +298,7 @@ public class FSEditLog {
         if (es == eStream) {  
           try { eStream.close(); } catch (IOException e) {
             // nothing to do.
-            FSNamesystem.LOG.warn("Failed to close eStream " + eStream.getName()
+            LOG.warn("Failed to close eStream " + eStream.getName()
                 + " before removing it (might be ok)");
           }
           ies.remove();
@@ -314,7 +309,7 @@ public class FSEditLog {
     
     if (editStreams == null || editStreams.size() <= 0) {
       String msg = "Fatal Error: All storage directories are inaccessible.";
-      FSNamesystem.LOG.fatal(msg, new IOException(msg)); 
+      LOG.fatal(msg, new IOException(msg)); 
       Runtime.getRuntime().exit(-1);
     }
 
@@ -325,7 +320,7 @@ public class FSEditLog {
     if(propagate) incrementCheckpointTime();
     
     lsd = fsimage.listStorageDirectories();
-    FSNamesystem.LOG.info("at the end current list of storage dirs:" + lsd);
+    LOG.info("at the end current list of storage dirs:" + lsd);
   }
 
 
@@ -341,7 +336,7 @@ public class FSEditLog {
     Iterator<StorageDirectory> it = fsimage.dirIterator(); 
     while (it.hasNext()) {
       StorageDirectory sd = it.next();
-      FSNamesystem.LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath()); 
+      LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath()); 
       if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
         return sd;
     }
@@ -371,491 +366,8 @@ public class FSEditLog {
   }
 
   /**
-   * Load an edit log, and apply the changes to the in-memory structure
-   * This is where we apply edits that we've been writing to disk all
-   * along.
-   */
-  int loadFSEdits(EditLogInputStream edits) throws IOException {
-    DataInputStream in = edits.getDataInputStream();
-    long startTime = now();
-    int numEdits = loadFSEdits(in, true);
-    FSImage.LOG.info("Edits file " + edits.getName() 
-        + " of size " + edits.length() + " edits # " + numEdits 
-        + " loaded in " + (now()-startTime)/1000 + " seconds.");
-    return numEdits;
-  }
-
-  int loadFSEdits(DataInputStream in, boolean closeOnExit) throws IOException {
-    int numEdits = 0;
-    int logVersion = 0;
-
-    try {
-      // Read log file version. Could be missing. 
-      in.mark(4);
-      // If edits log is greater than 2G, available method will return negative
-      // numbers, so we avoid having to call available
-      boolean available = true;
-      try {
-        logVersion = in.readByte();
-      } catch (EOFException e) {
-        available = false;
-      }
-      if (available) {
-        in.reset();
-        logVersion = in.readInt();
-        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-          throw new IOException(
-                          "Unexpected version of the file system log file: "
-                          + logVersion + ". Current version = " 
-                          + FSConstants.LAYOUT_VERSION + ".");
-      }
-      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-                            "Unsupported version " + logVersion;
-      numEdits = loadEditRecords(logVersion, in, false);
-    } finally {
-      if(closeOnExit)
-        in.close();
-    }
-    if (logVersion != FSConstants.LAYOUT_VERSION) // other version
-      numEdits++; // save this image asap
-    return numEdits;
-  }
-
-  @SuppressWarnings("deprecation")
-  int loadEditRecords(int logVersion, DataInputStream in,
-      boolean closeOnExit) throws IOException {
-    FSNamesystem fsNamesys = fsimage.getFSNamesystem();
-    FSDirectory fsDir = fsNamesys.dir;
-    int numEdits = 0;
-    String clientName = null;
-    String clientMachine = null;
-    String path = null;
-    int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
-        numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
-        numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, 
-        numOpSymlink = 0, numOpGetDelegationToken = 0,
-        numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
-        numOpUpdateMasterKey = 0, numOpOther = 0;
-
-    try {
-      while (true) {
-        long timestamp = 0;
-        long mtime = 0;
-        long atime = 0;
-        long blockSize = 0;
-        byte opcode = -1;
-        try {
-          in.mark(1);
-          opcode = in.readByte();
-          if (opcode == OP_INVALID) {
-            in.reset(); // reset back to end of file if somebody reads it again
-            break; // no more transactions
-          }
-        } catch (EOFException e) {
-          break; // no more transactions
-        }
-        numEdits++;
-        switch (opcode) {
-        case OP_ADD:
-        case OP_CLOSE: {
-          // versions > 0 support per file replication
-          // get name and replication
-          int length = in.readInt();
-          if (-7 == logVersion && length != 3||
-              -17 < logVersion && logVersion < -7 && length != 4 ||
-              logVersion <= -17 && length != 5) {
-              throw new IOException("Incorrect data format."  +
-                                    " logVersion is " + logVersion +
-                                    " but writables.length is " +
-                                    length + ". ");
-          }
-          path = FSImage.readString(in);
-          short replication = adjustReplication(readShort(in));
-          mtime = readLong(in);
-          if (logVersion <= -17) {
-            atime = readLong(in);
-          }
-          if (logVersion < -7) {
-            blockSize = readLong(in);
-          }
-          // get blocks
-          boolean isFileUnderConstruction = (opcode == OP_ADD);
-          BlockInfo blocks[] = 
-            readBlocks(in, logVersion, isFileUnderConstruction, replication);
-
-          // Older versions of HDFS does not store the block size in inode.
-          // If the file has more than one block, use the size of the
-          // first block as the blocksize. Otherwise use the default
-          // block size.
-          if (-8 <= logVersion && blockSize == 0) {
-            if (blocks.length > 1) {
-              blockSize = blocks[0].getNumBytes();
-            } else {
-              long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
-              blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
-            }
-          }
-           
-          PermissionStatus permissions = fsNamesys.getUpgradePermission();
-          if (logVersion <= -11) {
-            permissions = PermissionStatus.read(in);
-          }
-
-          // clientname, clientMachine and block locations of last block.
-          if (opcode == OP_ADD && logVersion <= -12) {
-            clientName = FSImage.readString(in);
-            clientMachine = FSImage.readString(in);
-            if (-13 <= logVersion) {
-              readDatanodeDescriptorArray(in);
-            }
-          } else {
-            clientName = "";
-            clientMachine = "";
-          }
-
-          // The open lease transaction re-creates a file if necessary.
-          // Delete the file if it already exists.
-          if (FSNamesystem.LOG.isDebugEnabled()) {
-            FSNamesystem.LOG.debug(opcode + ": " + path + 
-                                   " numblocks : " + blocks.length +
-                                   " clientHolder " +  clientName +
-                                   " clientMachine " + clientMachine);
-          }
-
-          fsDir.unprotectedDelete(path, mtime);
-
-          // add to the file tree
-          INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                                                    path, permissions,
-                                                    blocks, replication, 
-                                                    mtime, atime, blockSize);
-          if (isFileUnderConstruction) {
-            numOpAdd++;
-            //
-            // Replace current node with a INodeUnderConstruction.
-            // Recreate in-memory lease record.
-            //
-            INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                      node.getLocalNameBytes(),
-                                      node.getReplication(), 
-                                      node.getModificationTime(),
-                                      node.getPreferredBlockSize(),
-                                      node.getBlocks(),
-                                      node.getPermissionStatus(),
-                                      clientName, 
-                                      clientMachine, 
-                                      null);
-            fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
-          }
-          break;
-        } 
-        case OP_SET_REPLICATION: {
-          numOpSetRepl++;
-          path = FSImage.readString(in);
-          short replication = adjustReplication(readShort(in));
-          fsDir.unprotectedSetReplication(path, replication, null);
-          break;
-        } 
-        case OP_CONCAT_DELETE: {
-          if (logVersion > -22) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          numOpConcatDelete++;
-          int length = in.readInt();
-          if (length < 3) { // trg, srcs.., timestam
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String trg = FSImage.readString(in);
-          int srcSize = length - 1 - 1; //trg and timestamp
-          String [] srcs = new String [srcSize];
-          for(int i=0; i<srcSize;i++) {
-            srcs[i]= FSImage.readString(in);
-          }
-          timestamp = readLong(in);
-          fsDir.unprotectedConcat(trg, srcs);
-          break;
-        }
-        case OP_RENAME_OLD: {
-          numOpRenameOld++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String s = FSImage.readString(in);
-          String d = FSImage.readString(in);
-          timestamp = readLong(in);
-          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-          fsDir.unprotectedRenameTo(s, d, timestamp);
-          fsNamesys.changeLease(s, d, dinfo);
-          break;
-        }
-        case OP_DELETE: {
-          numOpDelete++;
-          int length = in.readInt();
-          if (length != 2) {
-            throw new IOException("Incorrect data format. " 
-                                  + "delete operation.");
-          }
-          path = FSImage.readString(in);
-          timestamp = readLong(in);
-          fsDir.unprotectedDelete(path, timestamp);
-          break;
-        }
-        case OP_MKDIR: {
-          numOpMkDir++;
-          PermissionStatus permissions = fsNamesys.getUpgradePermission();
-          int length = in.readInt();
-          if (-17 < logVersion && length != 2 ||
-              logVersion <= -17 && length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          path = FSImage.readString(in);
-          timestamp = readLong(in);
-
-          // The disk format stores atimes for directories as well.
-          // However, currently this is not being updated/used because of
-          // performance reasons.
-          if (logVersion <= -17) {
-            atime = readLong(in);
-          }
-
-          if (logVersion <= -11) {
-            permissions = PermissionStatus.read(in);
-          }
-          fsDir.unprotectedMkdir(path, permissions, timestamp);
-          break;
-        }
-        case OP_SET_GENSTAMP: {
-          numOpSetGenStamp++;
-          long lw = in.readLong();
-          fsNamesys.setGenerationStamp(lw);
-          break;
-        } 
-        case OP_DATANODE_ADD: {
-          numOpOther++;
-          FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
-          nodeimage.readFields(in);
-          //Datanodes are not persistent any more.
-          break;
-        }
-        case OP_DATANODE_REMOVE: {
-          numOpOther++;
-          DatanodeID nodeID = new DatanodeID();
-          nodeID.readFields(in);
-          //Datanodes are not persistent any more.
-          break;
-        }
-        case OP_SET_PERMISSIONS: {
-          numOpSetPerm++;
-          if (logVersion > -11)
-            throw new IOException("Unexpected opcode " + opcode
-                                  + " for version " + logVersion);
-          fsDir.unprotectedSetPermission(
-              FSImage.readString(in), FsPermission.read(in));
-          break;
-        }
-        case OP_SET_OWNER: {
-          numOpSetOwner++;
-          if (logVersion > -11)
-            throw new IOException("Unexpected opcode " + opcode
-                                  + " for version " + logVersion);
-          fsDir.unprotectedSetOwner(FSImage.readString(in),
-              FSImage.readString_EmptyAsNull(in),
-              FSImage.readString_EmptyAsNull(in));
-          break;
-        }
-        case OP_SET_NS_QUOTA: {
-          if (logVersion > -16) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          fsDir.unprotectedSetQuota(FSImage.readString(in), 
-                                    readLongWritable(in), 
-                                    FSConstants.QUOTA_DONT_SET);
-          break;
-        }
-        case OP_CLEAR_NS_QUOTA: {
-          if (logVersion > -16) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          fsDir.unprotectedSetQuota(FSImage.readString(in),
-                                    FSConstants.QUOTA_RESET,
-                                    FSConstants.QUOTA_DONT_SET);
-          break;
-        }
-
-        case OP_SET_QUOTA:
-          fsDir.unprotectedSetQuota(FSImage.readString(in),
-                                    readLongWritable(in),
-                                    readLongWritable(in));
-                                      
-          break;
-
-        case OP_TIMES: {
-          numOpTimes++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "times operation.");
-          }
-          path = FSImage.readString(in);
-          mtime = readLong(in);
-          atime = readLong(in);
-          fsDir.unprotectedSetTimes(path, mtime, atime, true);
-          break;
-        }
-        case OP_SYMLINK: {
-          numOpSymlink++;
-          int length = in.readInt();
-          if (length != 4) {
-            throw new IOException("Incorrect data format. " 
-                                  + "symlink operation.");
-          }
-          path = FSImage.readString(in);
-          String value = FSImage.readString(in);
-          mtime = readLong(in);
-          atime = readLong(in);
-          PermissionStatus perm = PermissionStatus.read(in);
-          fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
-          break;
-        }
-        case OP_RENAME: {
-          if (logVersion > -21) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          numOpRename++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String s = FSImage.readString(in);
-          String d = FSImage.readString(in);
-          timestamp = readLong(in);
-          Rename[] options = readRenameOptions(in);
-          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-          fsDir.unprotectedRenameTo(s, d, timestamp, options);
-          fsNamesys.changeLease(s, d, dinfo);
-          break;
-        }
-        case OP_GET_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          numOpGetDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          long expiryTime = readLong(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .addPersistedDelegationToken(delegationTokenId, expiryTime);
-          break;
-        }
-        case OP_RENEW_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          numOpRenewDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          long expiryTime = readLong(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
-          break;
-        }
-        case OP_CANCEL_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          numOpCancelDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenCancellation(delegationTokenId);
-          break;
-        }
-        case OP_UPDATE_MASTER_KEY: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opcode " + opcode
-                + " for version " + logVersion);
-          }
-          numOpUpdateMasterKey++;
-          DelegationKey delegationKey = new DelegationKey();
-          delegationKey.readFields(in);
-          fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
-              delegationKey);
-          break;
-        }
-        default: {
-          throw new IOException("Never seen opcode " + opcode);
-        }
-        }
-      }
-    } finally {
-      if(closeOnExit)
-        in.close();
-    }
-    if (FSImage.LOG.isDebugEnabled()) {
-      FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
-          + " numOpDelete = " + numOpDelete 
-          + " numOpRenameOld = " + numOpRenameOld 
-          + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
-          + " numOpSetPerm = " + numOpSetPerm 
-          + " numOpSetOwner = " + numOpSetOwner
-          + " numOpSetGenStamp = " + numOpSetGenStamp 
-          + " numOpTimes = " + numOpTimes
-          + " numOpConcatDelete  = " + numOpConcatDelete
-          + " numOpRename = " + numOpRename
-          + " numOpGetDelegationToken = " + numOpGetDelegationToken
-          + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
-          + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
-          + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
-          + " numOpOther = " + numOpOther);
-    }
-    return numEdits;
-  }
-
-  // a place holder for reading a long
-  private static final LongWritable longWritable = new LongWritable();
-
-  /** Read an integer from an input stream */
-  private static long readLongWritable(DataInputStream in) throws IOException {
-    synchronized (longWritable) {
-      longWritable.readFields(in);
-      return longWritable.get();
-    }
-  }
-  
-  short adjustReplication(short replication) {
-    FSNamesystem fsNamesys = fsimage.getFSNamesystem();
-    short minReplication = fsNamesys.getMinReplication();
-    if (replication<minReplication) {
-      replication = minReplication;
-    }
-    short maxReplication = fsNamesys.getMaxReplication();
-    if (replication>maxReplication) {
-      replication = maxReplication;
-    }
-    return replication;
-  }
-
-  /**
-   * Write an operation to the edit log. 
-   * Automatically sync buffered edits to persistent store if it is time
-   * to sync.
+   * Write an operation to the edit log. Do not sync to persistent
+   * store yet.
    */
   void logEdit(byte op, Writable ... writables) {
     synchronized (this) {
@@ -867,16 +379,12 @@ public class FSEditLog {
       ArrayList<EditLogOutputStream> errorStreams = null;
       long start = now();
       for(EditLogOutputStream eStream : editStreams) {
-        if(FSImage.LOG.isDebugEnabled()) {
-          FSImage.LOG.debug("loggin edits into " + eStream.getName() +
-              " stream");
-        }
         if(!eStream.isOperationSupported(op))
           continue;
         try {
           eStream.write(op, writables);
         } catch (IOException ie) {
-          FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
+          LOG.error("logEdit: removing "+ eStream.getName(), ie);
           if(errorStreams == null)
             errorStreams = new ArrayList<EditLogOutputStream>(1);
           errorStreams.add(eStream);
@@ -1041,7 +549,7 @@ public class FSEditLog {
             eStream.setReadyToFlush();
             streams.add(eStream);
           } catch (IOException ie) {
-            FSNamesystem.LOG.error("Unable to get ready to flush.", ie);
+            LOG.error("Unable to get ready to flush.", ie);
             //
             // remember the streams that encountered an error.
             //
@@ -1063,7 +571,7 @@ public class FSEditLog {
         try {
           eStream.flush();
         } catch (IOException ie) {
-          FSNamesystem.LOG.error("Unable to sync edit log.", ie);
+          LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
@@ -1119,7 +627,7 @@ public class FSEditLog {
       buf.append(eStream.getTotalSyncTime());
       buf.append(" ");
     }
-    FSNamesystem.LOG.info(buf);
+    LOG.info(buf);
   }
 
   /** 
@@ -1134,7 +642,7 @@ public class FSEditLog {
       FSEditLog.toLogLong(newNode.getModificationTime()),
       FSEditLog.toLogLong(newNode.getAccessTime()),
       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
-    logEdit(OP_ADD,
+    logEdit(Ops.OP_ADD,
             new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), 
             new ArrayWritable(Block.class, newNode.getBlocks()),
             newNode.getPermissionStatus(),
@@ -1152,7 +660,7 @@ public class FSEditLog {
       FSEditLog.toLogLong(newNode.getModificationTime()),
       FSEditLog.toLogLong(newNode.getAccessTime()),
       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
-    logEdit(OP_CLOSE,
+    logEdit(Ops.OP_CLOSE,
             new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
             new ArrayWritable(Block.class, newNode.getBlocks()),
             newNode.getPermissionStatus());
@@ -1167,7 +675,7 @@ public class FSEditLog {
       FSEditLog.toLogLong(newNode.getModificationTime()),
       FSEditLog.toLogLong(newNode.getAccessTime())
     };
-    logEdit(OP_MKDIR, new ArrayWritable(DeprecatedUTF8.class, info),
+    logEdit(Ops.OP_MKDIR, new ArrayWritable(DeprecatedUTF8.class, info),
         newNode.getPermissionStatus());
   }
   
@@ -1180,7 +688,7 @@ public class FSEditLog {
       new DeprecatedUTF8(src),
       new DeprecatedUTF8(dst),
       FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+    logEdit(Ops.OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
   }
   
   /** 
@@ -1191,7 +699,7 @@ public class FSEditLog {
       new DeprecatedUTF8(src),
       new DeprecatedUTF8(dst),
       FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
+    logEdit(Ops.OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
         toBytesWritable(options));
   }
   
@@ -1199,7 +707,7 @@ public class FSEditLog {
    * Add set replication record to edit log
    */
   void logSetReplication(String src, short replication) {
-    logEdit(OP_SET_REPLICATION, 
+    logEdit(Ops.OP_SET_REPLICATION, 
             new DeprecatedUTF8(src), 
             FSEditLog.toLogReplication(replication));
   }
@@ -1210,20 +718,20 @@ public class FSEditLog {
    * @param quota the directory size limit
    */
   void logSetQuota(String src, long nsQuota, long dsQuota) {
-    logEdit(OP_SET_QUOTA, new DeprecatedUTF8(src), 
+    logEdit(Ops.OP_SET_QUOTA, new DeprecatedUTF8(src), 
             new LongWritable(nsQuota), new LongWritable(dsQuota));
   }
 
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
-    logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
+    logEdit(Ops.OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
   }
 
   /**  Add set owner record to edit log */
   void logSetOwner(String src, String username, String groupname) {
     DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
     DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
-    logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
+    logEdit(Ops.OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
   }
   
   /**
@@ -1238,7 +746,7 @@ public class FSEditLog {
       info[idx++] = new DeprecatedUTF8(srcs[i]);
     }
     info[idx] = FSEditLog.toLogLong(timestamp);
-    logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+    logEdit(Ops.OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
   }
   
   /** 
@@ -1248,14 +756,14 @@ public class FSEditLog {
     DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
       new DeprecatedUTF8(src),
       FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+    logEdit(Ops.OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
   }
 
   /** 
    * Add generation stamp record to edit log
    */
   void logGenerationStamp(long genstamp) {
-    logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
+    logEdit(Ops.OP_SET_GENSTAMP, new LongWritable(genstamp));
   }
 
   /** 
@@ -1266,7 +774,7 @@ public class FSEditLog {
       new DeprecatedUTF8(src),
       FSEditLog.toLogLong(mtime),
       FSEditLog.toLogLong(atime)};
-    logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
+    logEdit(Ops.OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
   }
 
   /** 
@@ -1279,7 +787,7 @@ public class FSEditLog {
       new DeprecatedUTF8(value),
       FSEditLog.toLogLong(mtime),
       FSEditLog.toLogLong(atime)};
-    logEdit(OP_SYMLINK, 
+    logEdit(Ops.OP_SYMLINK, 
             new ArrayWritable(DeprecatedUTF8.class, info),
             node.getPermissionStatus());
   }
@@ -1292,20 +800,20 @@ public class FSEditLog {
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+    logEdit(Ops.OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
   }
   
   void logRenewDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+    logEdit(Ops.OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
   }
   
   void logCancelDelegationToken(DelegationTokenIdentifier id) {
-    logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+    logEdit(Ops.OP_CANCEL_DELEGATION_TOKEN, id);
   }
   
   void logUpdateMasterKey(DelegationKey key) {
-    logEdit(OP_UPDATE_MASTER_KEY, key);
+    logEdit(Ops.OP_UPDATE_MASTER_KEY, key);
   }
   
   static private DeprecatedUTF8 toLogReplication(short replication) {
@@ -1332,7 +840,7 @@ public class FSEditLog {
           "Wrong streams size";
         size = Math.max(size, curSize);
       } catch (IOException e) {
-        FSImage.LOG.warn("getEditLogSize: editstream.length failed. removing editlog (" +
+        LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
             idx + ") " + es.getName());
         if(al==null) al = new ArrayList<EditLogOutputStream>(1);
         al.add(es);
@@ -1403,7 +911,7 @@ public class FSEditLog {
         // replace by the new stream
         itE.replace(eStream);
       } catch (IOException e) {
-        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
+        LOG.warn("Error in editStream " + eStream.getName(), e);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1482,7 +990,7 @@ public class FSEditLog {
         // replace by the new stream
         itE.replace(eStream);
       } catch (IOException e) {
-        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
+        LOG.warn("Error in editStream " + eStream.getName(), e);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1520,84 +1028,6 @@ public class FSEditLog {
     sizeOutputFlushBuffer = size;
   }
 
-  /**
-   * A class to read in blocks stored in the old format. The only two
-   * fields in the block were blockid and length.
-   */
-  static class BlockTwo implements Writable {
-    long blkid;
-    long len;
-
-    static {                                      // register a ctor
-      WritableFactories.setFactory
-        (BlockTwo.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new BlockTwo(); }
-         });
-    }
-
-
-    BlockTwo() {
-      blkid = 0;
-      len = 0;
-    }
-    /////////////////////////////////////
-    // Writable
-    /////////////////////////////////////
-    public void write(DataOutput out) throws IOException {
-      out.writeLong(blkid);
-      out.writeLong(len);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      this.blkid = in.readLong();
-      this.len = in.readLong();
-    }
-  }
-
-  /** This method is defined for compatibility reason. */
-  static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
-      ) throws IOException {
-    DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
-    for (int i = 0; i < locations.length; i++) {
-      locations[i] = new DatanodeDescriptor();
-      locations[i].readFieldsFromFSEditLog(in);
-    }
-    return locations;
-  }
-
-  static private short readShort(DataInputStream in) throws IOException {
-    return Short.parseShort(FSImage.readString(in));
-  }
-
-  static private long readLong(DataInputStream in) throws IOException {
-    return Long.parseLong(FSImage.readString(in));
-  }
-
-  static private BlockInfo[] readBlocks(
-      DataInputStream in,
-      int logVersion,
-      boolean isFileUnderConstruction,
-      short replication) throws IOException {
-    int numBlocks = in.readInt();
-    BlockInfo[] blocks = new BlockInfo[numBlocks];
-    Block blk = new Block();
-    BlockTwo oldblk = new BlockTwo();
-    for (int i = 0; i < numBlocks; i++) {
-      if (logVersion <= -14) {
-        blk.readFields(in);
-      } else {
-        oldblk.readFields(in);
-        blk.set(oldblk.blkid, oldblk.len,
-                GenerationStamp.GRANDFATHER_GENERATION_STAMP);
-      }
-      if(isFileUnderConstruction && i == numBlocks-1)
-        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
-      else
-        blocks[i] = new BlockInfo(blk, replication);
-    }
-    return blocks;
-  }
 
   boolean isEmpty() throws IOException {
     return getEditLogSize() <= 0;
@@ -1632,7 +1062,7 @@ public class FSEditLog {
       boStream = new EditLogBackupOutputStream(bnReg, nnReg);
       editStreams.add(boStream);
     }
-    logEdit(OP_JSPOOL_START, (Writable[])null);
+    logEdit(Ops.OP_JSPOOL_START, (Writable[])null);
   }
 
   /**
@@ -1648,7 +1078,7 @@ public class FSEditLog {
       try {
         eStream.write(data, 0, length);
       } catch (IOException ie) {
-        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), ie);
+        LOG.warn("Error in editStream " + eStream.getName(), ie);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1730,7 +1160,7 @@ public class FSEditLog {
   void incrementCheckpointTime() {
     fsimage.incrementCheckpointTime();
     Writable[] args = {new LongWritable(fsimage.getCheckpointTime())};
-    logEdit(OP_CHECKPOINT_TIME, args);
+    logEdit(Ops.OP_CHECKPOINT_TIME, args);
   }
 
   synchronized void releaseBackupStream(NamenodeRegistration registration) {
@@ -1781,19 +1211,6 @@ public class FSEditLog {
     return regAllowed;
   }
   
-  static Rename[] readRenameOptions(DataInputStream in) throws IOException {
-    BytesWritable writable = new BytesWritable();
-    writable.readFields(in);
-    
-    byte[] bytes = writable.getBytes();
-    Rename[] options = new Rename[bytes.length];
-    
-    for (int i = 0; i < bytes.length; i++) {
-      options[i] = Rename.valueOf(bytes[i]);
-    }
-    return options;
-  }
-  
   static BytesWritable toBytesWritable(Options.Rename... options) {
     byte[] bytes = new byte[options.length];
     for (int i = 0; i < options.length; i++) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Oct 29 17:04:07 2010
@@ -1142,7 +1142,7 @@ public class FSImage extends Storage {
         long blockSize = 0;
         pathComponents = readPathComponents(in);
         replication = in.readShort();
-        replication = editLog.adjustReplication(replication);
+        replication = fsNamesys.adjustReplication(replication);
         modificationTime = in.readLong();
         if (imgVersion <= -17) {
           atime = in.readLong();
@@ -1283,17 +1283,19 @@ public class FSImage extends Storage {
    * @throws IOException
    */
   int loadFSEdits(StorageDirectory sd) throws IOException {
+    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+    
     int numEdits = 0;
     EditLogFileInputStream edits = 
       new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
     
-    numEdits = editLog.loadFSEdits(edits);
+    numEdits = loader.loadFSEdits(edits);
     edits.close();
     File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
     
     if (editsNew.exists() && editsNew.length() > 0) {
       edits = new EditLogFileInputStream(editsNew);
-      numEdits += editLog.loadFSEdits(edits);
+      numEdits += loader.loadFSEdits(edits);
       edits.close();
     }
     

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 29 17:04:07 2010
@@ -3470,6 +3470,22 @@ public class FSNamesystem implements FSC
   short getMaxReplication()     { return (short)blockManager.maxReplication; }
   short getMinReplication()     { return (short)blockManager.minReplication; }
   short getDefaultReplication() { return (short)blockManager.defaultReplication; }
+
+  /**
+   * Clamp the specified replication between the minimum and maximum
+   * replication levels for this namesystem.
+   */
+  short adjustReplication(short replication) {
+    short minReplication = getMinReplication();
+    if (replication < minReplication) {
+      replication = minReplication;
+    }
+    short maxReplication = getMaxReplication();
+    if (replication > maxReplication) {
+      replication = maxReplication;
+    }
+    return replication;
+  }
     
   /**
    * A immutable object that stores the number of live replicas and

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Oct 29 17:04:07 2010
@@ -138,11 +138,12 @@ public class TestEditLog extends TestCas
       // If there were any corruptions, it is likely that the reading in
       // of these transactions will throw an exception.
       //
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       for (Iterator<StorageDirectory> it = 
               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
         File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
-        int numEdits = namesystem.getEditLog().loadFSEdits(
+        int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));
         int numLeases = namesystem.leaseManager.countLease();
         System.out.println("Number of outstanding leases " + numLeases);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java Fri Oct 29 17:04:07 2010
@@ -221,7 +221,7 @@ public class TestEditLogRace {
            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
       System.out.println("Verifying file: " + editFile);
-      int numEdits = namesystem.getEditLog().loadFSEdits(
+      int numEdits = new FSEditLogLoader(namesystem).loadFSEdits(
         new EditLogFileInputStream(editFile));
       System.out.println("Number of edits: " + numEdits);
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1028847&r1=1028846&r2=1028847&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Fri Oct 29 17:04:07 2010
@@ -134,13 +134,14 @@ public class TestSecurityTokenEditLog ex
       // If there were any corruptions, it is likely that the reading in
       // of these transactions will throw an exception.
       //
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       namesystem.getDelegationTokenSecretManager().stopThreads();
       int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys();
       for (Iterator<StorageDirectory> it = 
               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
         File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
-        int numEdits = namesystem.getEditLog().loadFSEdits(
+        int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));
         assertTrue("Verification for " + editFile + " failed. " +
                    "Expected " + (NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS + numKeys) + " transactions. "+