You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/26 22:46:59 UTC

svn commit: r1151238 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/namenode/

Author: todd
Date: Tue Jul 26 20:46:58 2011
New Revision: 1151238

URL: http://svn.apache.org/viewvc?rev=1151238&view=rev
Log:
HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp implementations. Contributed by Ivan Kelly.

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Tue Jul 26 20:46:58 2011
@@ -601,6 +601,9 @@ Trunk (unreleased changes)
 
     HDFS-2180. Refactor NameNode HTTP server into new class. (todd)
 
+    HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
+    implementations. (Ivan Kelly via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Tue Jul 26 20:46:58 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -45,25 +46,18 @@ class EditLogBackupOutputStream extends 
   private NamenodeProtocol backupNode;          // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
-  private ArrayList<JournalRecord> bufCurrent;  // current buffer for writing
-  private ArrayList<JournalRecord> bufReady;    // buffer ready for flushing
+  private ArrayList<BufferedOp> bufCurrent;  // current buffer for writing
+  private ArrayList<BufferedOp> bufReady;    // buffer ready for flushing
   private DataOutputBuffer out;     // serialized output sent to backup node
 
-  static class JournalRecord {
-    byte op;
-    Writable[] args;
-
-    JournalRecord(byte op, Writable ... writables) {
-      this.op = op;
-      this.args = writables;
-    }
-
-    void write(DataOutputStream out) throws IOException {
-      out.write(op);
-      if(args == null)
-        return;
-      for(Writable w : args)
-        w.write(out);
+  
+  private static class BufferedOp { 
+    public final FSEditLogOpCodes opCode;
+    public final byte[] bytes;
+
+    public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) {
+      this.opCode = opCode;
+      this.bytes = bytes;
     }
   }
 
@@ -84,8 +78,8 @@ class EditLogBackupOutputStream extends 
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
     }
-    this.bufCurrent = new ArrayList<JournalRecord>();
-    this.bufReady = new ArrayList<JournalRecord>();
+    this.bufCurrent = new ArrayList<BufferedOp>();
+    this.bufReady = new ArrayList<BufferedOp>();
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
 
@@ -100,13 +94,18 @@ class EditLogBackupOutputStream extends 
   }
 
   @Override // EditLogOutputStream
-  public void write(int b) throws IOException {
-    throw new IOException("Not implemented");
+  void write(FSEditLogOp op) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream s = new DataOutputStream(baos);
+    FSEditLogOp.Writer w = new FSEditLogOp.Writer(s);
+    w.writeOp(op);
+
+    bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray()));
   }
 
-  @Override // EditLogOutputStream
-  void write(byte op, Writable ... writables) throws IOException {
-    bufCurrent.add(new JournalRecord(op, writables));
+  @Override
+  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    throw new IOException("Not supported");
   }
 
   /**
@@ -134,7 +133,7 @@ class EditLogBackupOutputStream extends 
   @Override // EditLogOutputStream
   void setReadyToFlush() throws IOException {
     assert bufReady.size() == 0 : "previous data is not flushed yet";
-    ArrayList<JournalRecord>  tmp = bufReady;
+    ArrayList<BufferedOp>  tmp = bufReady;
     bufReady = bufCurrent;
     bufCurrent = tmp;
   }
@@ -144,12 +143,13 @@ class EditLogBackupOutputStream extends 
     assert out.size() == 0 : "Output buffer is not empty";
     int bufReadySize = bufReady.size();
     for(int idx = 0; idx < bufReadySize; idx++) {
-      JournalRecord jRec = null;
+      BufferedOp jRec = null;
       for(; idx < bufReadySize; idx++) {
         jRec = bufReady.get(idx);
-        if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
+        if(jRec.opCode.getOpCode() 
+           >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
           break;  // special operation should be sent in a separate call to BN
-        jRec.write(out);
+        out.write(jRec.bytes, 0, jRec.bytes.length);
       }
       if(out.size() > 0)
         send(NamenodeProtocol.JA_JOURNAL);
@@ -157,8 +157,8 @@ class EditLogBackupOutputStream extends 
         break;
       // operation like start journal spool or increment checkpoint time
       // is a separate call to BN
-      jRec.write(out);
-      send(jRec.op);
+      out.write(jRec.bytes, 0, jRec.bytes.length);
+      send(jRec.opCode.getOpCode());
     }
     bufReady.clear();         // erase all data in the buffer
     out.reset();              // reset buffer to the start position

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Tue Jul 26 20:46:58 2011
@@ -45,6 +45,7 @@ class EditLogFileOutputStream extends Ed
   private FileChannel fc; // channel of the file stream for sync
   private DataOutputBuffer bufCurrent; // current buffer for writing
   private DataOutputBuffer bufReady; // buffer ready for flushing
+  private FSEditLogOp.Writer writer;
   final private int initBufferSize; // inital buffer size
   static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
 
@@ -70,6 +71,7 @@ class EditLogFileOutputStream extends Ed
     initBufferSize = size;
     bufCurrent = new DataOutputBuffer(size);
     bufReady = new DataOutputBuffer(size);
+    writer = new FSEditLogOp.Writer(bufCurrent);
     RandomAccessFile rp = new RandomAccessFile(name, "rw");
     fp = new FileOutputStream(rp.getFD()); // open for append
     fc = rp.getChannel();
@@ -88,18 +90,11 @@ class EditLogFileOutputStream extends Ed
 
   /** {@inheritDoc} */
   @Override
-  public void write(int b) throws IOException {
-    bufCurrent.write(b);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  void write(byte op, Writable... writables) throws IOException {
+  void write(FSEditLogOp op) throws IOException {
     int start = bufCurrent.getLength();
-    write(op);
-    for (Writable w : writables) {
-      w.write(bufCurrent);
-    }
+    
+    writer.writeOp(op);
+
     // write transaction checksum
     int end = bufCurrent.getLength();
     Checksum checksum = FSEditLog.getChecksum();
@@ -109,6 +104,12 @@ class EditLogFileOutputStream extends Ed
     bufCurrent.writeInt(sum);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    bufCurrent.write(bytes, offset, length);
+  }
+
   /**
    * Create empty edits logs file.
    */
@@ -136,6 +137,7 @@ class EditLogFileOutputStream extends Ed
         }
         bufCurrent.close();
         bufCurrent = null;
+        writer = null;
       }
   
       if(bufReady != null) {
@@ -156,6 +158,7 @@ class EditLogFileOutputStream extends Ed
     } finally {
       IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
       bufCurrent = bufReady = null;
+      writer = null;
       fc = null;
       fp = null;
     }
@@ -168,10 +171,11 @@ class EditLogFileOutputStream extends Ed
   @Override
   void setReadyToFlush() throws IOException {
     assert bufReady.size() == 0 : "previous data is not flushed yet";
-    write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
+    bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
     DataOutputBuffer tmp = bufReady;
     bufReady = bufCurrent;
     bufCurrent = tmp;
+    writer = new FSEditLogOp.Writer(bufCurrent);
   }
 
   /**

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Tue Jul 26 20:46:58 2011
@@ -18,17 +18,14 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
-import java.io.OutputStream;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.io.Writable;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
  * a persistent storage.
  */
-abstract class EditLogOutputStream extends OutputStream 
-implements JournalStream {
+abstract class EditLogOutputStream implements JournalStream {
   // these are statistics counters
   private long numSync;        // number of sync(s) to disk
   private long totalTimeSync;  // total time to sync
@@ -37,19 +34,27 @@ implements JournalStream {
     numSync = totalTimeSync = 0;
   }
 
-  /** {@inheritDoc} */
-  abstract public void write(int b) throws IOException;
-
   /**
-   * Write edits log record into the stream.
-   * The record is represented by operation name and
-   * an array of Writable arguments.
+   * Write edits log operation to the stream.
    * 
    * @param op operation
-   * @param writables array of Writable arguments
    * @throws IOException
    */
-  abstract void write(byte op, Writable ... writables) throws IOException;
+  abstract void write(FSEditLogOp op) throws IOException;
+
+  /**
+   * Write raw data to an edit log. This data should already have
+   * the transaction ID, checksum, etc included. It is for use
+   * within the BackupNode when replicating edits from the
+   * NameNode.
+   *
+   * @param bytes the bytes to write.
+   * @param offset offset in the bytes to write from
+   * @param length number of bytes to write
+   * @throws IOException
+   */
+  abstract void writeRaw(byte[] bytes, int offset, int length)
+      throws IOException;
 
   /**
    * Create and initialize underlying persistent edits log storage.

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jul 26 20:46:58 2011
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.File;
 import java.io.IOException;
+import java.io.DataOutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.zip.Checksum;
+import java.util.zip.CheckedOutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,7 +32,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -43,14 +44,13 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.PureJavaCrc32;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -319,7 +319,7 @@ public class FSEditLog implements NNStor
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  void logEdit(FSEditLogOpCodes opCode, Writable ... writables) {
+  void logEdit(FSEditLogOp op) {
     synchronized (this) {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
@@ -329,10 +329,10 @@ public class FSEditLog implements NNStor
       ArrayList<EditLogOutputStream> errorStreams = null;
       long start = now();
       for(EditLogOutputStream eStream : editStreams) {
-        if(!eStream.isOperationSupported(opCode.getOpCode()))
+        if(!eStream.isOperationSupported(op.opCode.getOpCode()))
           continue;
         try {
-          eStream.write(opCode.getOpCode(), writables);
+          eStream.write(op);
         } catch (IOException ie) {
           LOG.error("logEdit: removing "+ eStream.getName(), ie);
           if(errorStreams == null)
@@ -585,49 +585,45 @@ public class FSEditLog implements NNStor
    * Records the block locations of the last block.
    */
   public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
-
-    DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(path), 
-      FSEditLog.toLogReplication(newNode.getReplication()),
-      FSEditLog.toLogLong(newNode.getModificationTime()),
-      FSEditLog.toLogLong(newNode.getAccessTime()),
-      FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
-    logEdit(OP_ADD,
-            new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), 
-            new ArrayWritable(Block.class, newNode.getBlocks()),
-            newNode.getPermissionStatus(),
-            new DeprecatedUTF8(newNode.getClientName()),
-            new DeprecatedUTF8(newNode.getClientMachine()));
+    AddOp op = AddOp.getInstance()
+      .setPath(path)
+      .setReplication(newNode.getReplication())
+      .setModificationTime(newNode.getModificationTime())
+      .setAccessTime(newNode.getAccessTime())
+      .setBlockSize(newNode.getPreferredBlockSize())
+      .setBlocks(newNode.getBlocks())
+      .setPermissionStatus(newNode.getPermissionStatus())
+      .setClientName(newNode.getClientName())
+      .setClientMachine(newNode.getClientMachine());
+    
+      logEdit(op);
   }
 
   /** 
    * Add close lease record to edit log.
    */
   public void logCloseFile(String path, INodeFile newNode) {
-    DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
-      new DeprecatedUTF8(path),
-      FSEditLog.toLogReplication(newNode.getReplication()),
-      FSEditLog.toLogLong(newNode.getModificationTime()),
-      FSEditLog.toLogLong(newNode.getAccessTime()),
-      FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
-    logEdit(OP_CLOSE,
-            new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
-            new ArrayWritable(Block.class, newNode.getBlocks()),
-            newNode.getPermissionStatus());
+    CloseOp op = CloseOp.getInstance()
+      .setPath(path)
+      .setReplication(newNode.getReplication())
+      .setModificationTime(newNode.getModificationTime())
+      .setAccessTime(newNode.getAccessTime())
+      .setBlockSize(newNode.getPreferredBlockSize())
+      .setBlocks(newNode.getBlocks())
+      .setPermissionStatus(newNode.getPermissionStatus());
+    
+    logEdit(op);
   }
   
   /** 
    * Add create directory record to edit log
    */
   public void logMkDir(String path, INode newNode) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
-      new DeprecatedUTF8(path),
-      FSEditLog.toLogLong(newNode.getModificationTime()),
-      FSEditLog.toLogLong(newNode.getAccessTime())
-    };
-    logEdit(OP_MKDIR,
-      new ArrayWritable(DeprecatedUTF8.class, info),
-      newNode.getPermissionStatus());
+    MkdirOp op = MkdirOp.getInstance()
+      .setPath(path)
+      .setTimestamp(newNode.getModificationTime())
+      .setPermissionStatus(newNode.getPermissionStatus());
+    logEdit(op);
   }
   
   /** 
@@ -635,33 +631,33 @@ public class FSEditLog implements NNStor
    * TODO: use String parameters until just before writing to disk
    */
   void logRename(String src, String dst, long timestamp) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      new DeprecatedUTF8(dst),
-      FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+    RenameOldOp op = RenameOldOp.getInstance()
+      .setSource(src)
+      .setDestination(dst)
+      .setTimestamp(timestamp);
+    logEdit(op);
   }
   
   /** 
    * Add rename record to edit log
    */
   void logRename(String src, String dst, long timestamp, Options.Rename... options) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      new DeprecatedUTF8(dst),
-      FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME,
-      new ArrayWritable(DeprecatedUTF8.class, info),
-      toBytesWritable(options));
+    RenameOp op = RenameOp.getInstance()
+      .setSource(src)
+      .setDestination(dst)
+      .setTimestamp(timestamp)
+      .setOptions(options);
+    logEdit(op);
   }
   
   /** 
    * Add set replication record to edit log
    */
   void logSetReplication(String src, short replication) {
-    logEdit(OP_SET_REPLICATION, 
-      new DeprecatedUTF8(src), 
-      FSEditLog.toLogReplication(replication));
+    SetReplicationOp op = SetReplicationOp.getInstance()
+      .setPath(src)
+      .setReplication(replication);
+    logEdit(op);
   }
   
   /** Add set namespace quota record to edit log
@@ -670,64 +666,69 @@ public class FSEditLog implements NNStor
    * @param quota the directory size limit
    */
   void logSetQuota(String src, long nsQuota, long dsQuota) {
-    logEdit(OP_SET_QUOTA,
-      new DeprecatedUTF8(src), 
-      new LongWritable(nsQuota), new LongWritable(dsQuota));
+    SetQuotaOp op = SetQuotaOp.getInstance()
+      .setSource(src)
+      .setNSQuota(nsQuota)
+      .setDSQuota(dsQuota);
+    logEdit(op);
   }
 
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
-    logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
+    SetPermissionsOp op = SetPermissionsOp.getInstance()
+      .setSource(src)
+      .setPermissions(permissions);
+    logEdit(op);
   }
 
   /**  Add set owner record to edit log */
   void logSetOwner(String src, String username, String groupname) {
-    DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
-    DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
-    logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
+    SetOwnerOp op = SetOwnerOp.getInstance()
+      .setSource(src)
+      .setUser(username)
+      .setGroup(groupname);
+    logEdit(op);
   }
   
   /**
    * concat(trg,src..) log
    */
   void logConcat(String trg, String [] srcs, long timestamp) {
-    int size = 1 + srcs.length + 1; // trg, srcs, timestamp
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
-    int idx = 0;
-    info[idx++] = new DeprecatedUTF8(trg);
-    for(int i=0; i<srcs.length; i++) {
-      info[idx++] = new DeprecatedUTF8(srcs[i]);
-    }
-    info[idx] = FSEditLog.toLogLong(timestamp);
-    logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+    ConcatDeleteOp op = ConcatDeleteOp.getInstance()
+      .setTarget(trg)
+      .setSources(srcs)
+      .setTimestamp(timestamp);
+    logEdit(op);
   }
   
   /** 
    * Add delete file record to edit log
    */
   void logDelete(String src, long timestamp) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+    DeleteOp op = DeleteOp.getInstance()
+      .setPath(src)
+      .setTimestamp(timestamp);
+    logEdit(op);
   }
 
   /** 
    * Add generation stamp record to edit log
    */
   void logGenerationStamp(long genstamp) {
-    logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
+    SetGenstampOp op = SetGenstampOp.getInstance()
+      .setGenerationStamp(genstamp);
+    logEdit(op);
   }
 
   /** 
    * Add access time record to edit log
    */
   void logTimes(String src, long mtime, long atime) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      FSEditLog.toLogLong(mtime),
-      FSEditLog.toLogLong(atime)};
-    logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
+    TimesOp op = TimesOp.getInstance()
+      .setPath(src)
+      .setModificationTime(mtime)
+      .setAccessTime(atime);
+    logEdit(op);
   }
 
   /** 
@@ -735,14 +736,13 @@ public class FSEditLog implements NNStor
    */
   void logSymlink(String path, String value, long mtime, 
                   long atime, INodeSymlink node) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(path),
-      new DeprecatedUTF8(value),
-      FSEditLog.toLogLong(mtime),
-      FSEditLog.toLogLong(atime)};
-    logEdit(OP_SYMLINK, 
-      new ArrayWritable(DeprecatedUTF8.class, info),
-      node.getPermissionStatus());
+    SymlinkOp op = SymlinkOp.getInstance()
+      .setPath(path)
+      .setValue(value)
+      .setModificationTime(mtime)
+      .setAccessTime(atime)
+      .setPermissionStatus(node.getPermissionStatus());
+    logEdit(op);
   }
   
   /**
@@ -753,36 +753,40 @@ public class FSEditLog implements NNStor
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+    GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
+      .setDelegationTokenIdentifier(id)
+      .setExpiryTime(expiryTime);
+    logEdit(op);
   }
   
   void logRenewDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+    RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
+      .setDelegationTokenIdentifier(id)
+      .setExpiryTime(expiryTime);
+    logEdit(op);
   }
   
   void logCancelDelegationToken(DelegationTokenIdentifier id) {
-    logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+    CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
+      .setDelegationTokenIdentifier(id);
+    logEdit(op);
   }
   
   void logUpdateMasterKey(DelegationKey key) {
-    logEdit(OP_UPDATE_MASTER_KEY, key);
+    UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
+      .setDelegationKey(key);
+    logEdit(op);
   }
 
   void logReassignLease(String leaseHolder, String src, String newHolder) {
-    logEdit(OP_REASSIGN_LEASE, new DeprecatedUTF8(leaseHolder),
-        new DeprecatedUTF8(src),
-        new DeprecatedUTF8(newHolder));
-  }
-  
-  static private DeprecatedUTF8 toLogReplication(short replication) {
-    return new DeprecatedUTF8(Short.toString(replication));
+    ReassignLeaseOp op = ReassignLeaseOp.getInstance()
+      .setLeaseHolder(leaseHolder)
+      .setPath(src)
+      .setNewHolder(newHolder);
+    logEdit(op);
   }
   
-  static private DeprecatedUTF8 toLogLong(long timestamp) {
-    return new DeprecatedUTF8(Long.toString(timestamp));
-  }
-
   /**
    * Return the size of the current EditLog
    */
@@ -1030,7 +1034,7 @@ public class FSEditLog implements NNStor
       boStream = new EditLogBackupOutputStream(bnReg, nnReg);
       editStreams.add(boStream);
     }
-    logEdit(OP_JSPOOL_START, (Writable[])null);
+    logEdit(JSpoolStartOp.getInstance());
   }
 
   /**
@@ -1044,7 +1048,7 @@ public class FSEditLog implements NNStor
     long start = now();
     for(EditLogOutputStream eStream : editStreams) {
       try {
-        eStream.write(data, 0, length);
+        eStream.writeRaw(data, 0, length);
       } catch (IOException ie) {
         LOG.warn("Error in editStream " + eStream.getName(), ie);
         if(errorStreams == null)
@@ -1127,8 +1131,9 @@ public class FSEditLog implements NNStor
 
   void incrementCheckpointTime() {
     storage.incrementCheckpointTime();
-    Writable[] args = {new LongWritable(storage.getCheckpointTime())};
-    logEdit(OP_CHECKPOINT_TIME, args);
+    CheckpointTimeOp op = CheckpointTimeOp.getInstance()
+      .setCheckpointTime(storage.getCheckpointTime());
+    logEdit(op); 
   }
 
   synchronized void releaseBackupStream(NamenodeRegistration registration) {
@@ -1179,13 +1184,6 @@ public class FSEditLog implements NNStor
     return regAllowed;
   }
   
-  static BytesWritable toBytesWritable(Options.Rename... options) {
-    byte[] bytes = new byte[options.length];
-    for (int i = 0; i < options.length; i++) {
-      bytes[i] = options[i].value();
-    }
-    return new BytesWritable(bytes);
-  }
 
   /**
    * Get the StorageDirectory for a stream

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jul 26 20:46:58 2011
@@ -38,13 +38,16 @@ import static org.apache.hadoop.hdfs.ser
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.EOFException;
 
@@ -58,6 +61,43 @@ import java.io.EOFException;
 public abstract class FSEditLogOp {
   final FSEditLogOpCodes opCode;
 
+  @SuppressWarnings("deprecation")
+  private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
+    new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
+      @Override
+      protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
+        EnumMap<FSEditLogOpCodes, FSEditLogOp> instances 
+          = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
+        instances.put(OP_ADD, new AddOp());
+        instances.put(OP_CLOSE, new CloseOp());
+        instances.put(OP_SET_REPLICATION, new SetReplicationOp());
+        instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+        instances.put(OP_RENAME_OLD, new RenameOldOp());
+        instances.put(OP_DELETE, new DeleteOp());
+        instances.put(OP_MKDIR, new MkdirOp());
+        instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
+        instances.put(OP_DATANODE_ADD, new DatanodeAddOp());
+        instances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
+        instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+        instances.put(OP_SET_OWNER, new SetOwnerOp());
+        instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+        instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+        instances.put(OP_SET_QUOTA, new SetQuotaOp());
+        instances.put(OP_TIMES, new TimesOp());
+        instances.put(OP_SYMLINK, new SymlinkOp());
+        instances.put(OP_RENAME, new RenameOp());
+        instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+        instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+        instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+        instances.put(OP_CANCEL_DELEGATION_TOKEN, 
+                      new CancelDelegationTokenOp());
+        instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+        instances.put(OP_CHECKPOINT_TIME, new CheckpointTimeOp());
+        instances.put(OP_JSPOOL_START, new JSpoolStartOp());
+        return instances;
+      }
+  };
+
   /**
    * Constructor for an EditLog Op. EditLog ops cannot be constructed
    * directly, but only through Reader#readOp.
@@ -66,10 +106,14 @@ public abstract class FSEditLogOp {
     this.opCode = opCode;
   }
 
-  public abstract void readFields(DataInputStream in, int logVersion)
+  abstract void readFields(DataInputStream in, int logVersion)
+      throws IOException;
+
+  abstract void writeFields(DataOutputStream out)
       throws IOException;
 
-  static class AddCloseOp extends FSEditLogOp {
+  @SuppressWarnings("unchecked")
+  static abstract class AddCloseOp extends FSEditLogOp {
     int length;
     String path;
     short replication;
@@ -87,7 +131,71 @@ public abstract class FSEditLogOp {
       assert(opCode == OP_ADD || opCode == OP_CLOSE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    <T extends AddCloseOp> T setPath(String path) {
+      this.path = path;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setReplication(short replication) {
+      this.replication = replication;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setModificationTime(long mtime) {
+      this.mtime = mtime;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setAccessTime(long atime) {
+      this.atime = atime;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setBlockSize(long blockSize) {
+      this.blockSize = blockSize;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setBlocks(Block[] blocks) {
+      this.blocks = blocks;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setPermissionStatus(PermissionStatus permissions) {
+      this.permissions = permissions;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setClientName(String clientName) {
+      this.clientName = clientName;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setClientMachine(String clientMachine) {
+      this.clientMachine = clientMachine;
+      return (T)this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path), 
+        toLogReplication(replication),
+        toLogLong(mtime),
+        toLogLong(atime),
+        toLogLong(blockSize)};
+      new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair).write(out);
+      new ArrayWritable(Block.class, blocks).write(out);
+      permissions.write(out);
+
+      if (this.opCode == OP_ADD) {
+        new DeprecatedUTF8(clientName).write(out);
+        new DeprecatedUTF8(clientMachine).write(out);
+      }
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       // versions > 0 support per file replication
       // get name and replication
@@ -168,6 +276,26 @@ public abstract class FSEditLogOp {
     }
   }
 
+  static class AddOp extends AddCloseOp {
+    private AddOp() {
+      super(OP_ADD);
+    }
+
+    static AddOp getInstance() {
+      return (AddOp)opInstances.get().get(OP_ADD);
+    }
+  }
+
+  static class CloseOp extends AddCloseOp {
+    private CloseOp() {
+      super(OP_CLOSE);
+    }
+
+    static CloseOp getInstance() {
+      return (CloseOp)opInstances.get().get(OP_CLOSE);
+    }
+  }
+
   static class SetReplicationOp extends FSEditLogOp {
     String path;
     short replication;
@@ -176,7 +304,29 @@ public abstract class FSEditLogOp {
       super(OP_SET_REPLICATION);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetReplicationOp getInstance() {
+      return (SetReplicationOp)opInstances.get()
+        .get(OP_SET_REPLICATION);
+    }
+
+    SetReplicationOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    SetReplicationOp setReplication(short replication) {
+      this.replication = replication;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(path).write(out);
+      new DeprecatedUTF8(Short.toString(replication)).write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.path = FSImageSerialization.readString(in);
       this.replication = readShort(in);
@@ -193,7 +343,41 @@ public abstract class FSEditLogOp {
       super(OP_CONCAT_DELETE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static ConcatDeleteOp getInstance() {
+      return (ConcatDeleteOp)opInstances.get()
+        .get(OP_CONCAT_DELETE);
+    }
+
+    ConcatDeleteOp setTarget(String trg) {
+      this.trg = trg;
+      return this;
+    }
+
+    ConcatDeleteOp setSources(String[] srcs) {
+      this.srcs = srcs;
+      return this;
+    }
+
+    ConcatDeleteOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+      int idx = 0;
+      info[idx++] = new DeprecatedUTF8(trg);
+      for(int i=0; i<srcs.length; i++) {
+        info[idx++] = new DeprecatedUTF8(srcs[i]);
+      }
+      info[idx] = toLogLong(timestamp);
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (length < 3) { // trg, srcs.., timestam
@@ -220,7 +404,37 @@ public abstract class FSEditLogOp {
       super(OP_RENAME_OLD);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static RenameOldOp getInstance() {
+      return (RenameOldOp)opInstances.get()
+        .get(OP_RENAME_OLD);
+    }
+
+    RenameOldOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    RenameOldOp setDestination(String dst) {
+      this.dst = dst;
+      return this;
+    }
+
+    RenameOldOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(src),
+        new DeprecatedUTF8(dst),
+        toLogLong(timestamp)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (this.length != 3) {
@@ -242,9 +456,32 @@ public abstract class FSEditLogOp {
       super(OP_DELETE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
-        throws IOException {
+    static DeleteOp getInstance() {
+      return (DeleteOp)opInstances.get()
+        .get(OP_DELETE);
+    }
+
+    DeleteOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    DeleteOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
 
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path),
+        toLogLong(timestamp)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
       this.length = in.readInt();
       if (this.length != 2) {
         throw new IOException("Incorrect data format. "
@@ -264,8 +501,40 @@ public abstract class FSEditLogOp {
     private MkdirOp() {
       super(OP_MKDIR);
     }
+    
+    static MkdirOp getInstance() {
+      return (MkdirOp)opInstances.get()
+        .get(OP_MKDIR);
+    }
+
+    MkdirOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    MkdirOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    MkdirOp setPermissionStatus(PermissionStatus permissions) {
+      this.permissions = permissions;
+      return this;
+    }
 
-    public void readFields(DataInputStream in, int logVersion)
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+        new DeprecatedUTF8(path),
+        toLogLong(timestamp), // mtime
+        toLogLong(timestamp) // atime, unused at this time
+      };
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      permissions.write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
 
       this.length = in.readInt();
@@ -299,32 +568,70 @@ public abstract class FSEditLogOp {
       super(OP_SET_GENSTAMP);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetGenstampOp getInstance() {
+      return (SetGenstampOp)opInstances.get()
+        .get(OP_SET_GENSTAMP);
+    }
+
+    SetGenstampOp setGenerationStamp(long genStamp) {
+      this.genStamp = genStamp;
+      return this;
+    }
+    
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new LongWritable(genStamp).write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.genStamp = in.readLong();
     }
   }
 
+  @SuppressWarnings("deprecation")
   static class DatanodeAddOp extends FSEditLogOp {
-    @SuppressWarnings("deprecation")
     private DatanodeAddOp() {
       super(OP_DATANODE_ADD);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static DatanodeAddOp getInstance() {
+      return (DatanodeAddOp)opInstances.get()
+        .get(OP_DATANODE_ADD);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated, should not write");
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       //Datanodes are not persistent any more.
       FSImageSerialization.DatanodeImage.skipOne(in);
     }
   }
 
+  @SuppressWarnings("deprecation")
   static class DatanodeRemoveOp extends FSEditLogOp {
-    @SuppressWarnings("deprecation")
     private DatanodeRemoveOp() {
       super(OP_DATANODE_REMOVE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static DatanodeRemoveOp getInstance() {
+      return (DatanodeRemoveOp)opInstances.get()
+        .get(OP_DATANODE_REMOVE);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated, should not write");
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       DatanodeID nodeID = new DatanodeID();
       nodeID.readFields(in);
@@ -340,7 +647,29 @@ public abstract class FSEditLogOp {
       super(OP_SET_PERMISSIONS);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetPermissionsOp getInstance() {
+      return (SetPermissionsOp)opInstances.get()
+        .get(OP_SET_PERMISSIONS);
+    }
+
+    SetPermissionsOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetPermissionsOp setPermissions(FsPermission permissions) {
+      this.permissions = permissions;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(src).write(out);
+      permissions.write(out);
+     }
+ 
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.permissions = FsPermission.read(in);
@@ -356,13 +685,42 @@ public abstract class FSEditLogOp {
       super(OP_SET_OWNER);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetOwnerOp getInstance() {
+      return (SetOwnerOp)opInstances.get()
+        .get(OP_SET_OWNER);
+    }
+
+    SetOwnerOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetOwnerOp setUser(String username) {
+      this.username = username;
+      return this;
+    }
+
+    SetOwnerOp setGroup(String groupname) {
+      this.groupname = groupname;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
+      DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
+      new DeprecatedUTF8(src).write(out);
+      u.write(out);
+      g.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.username = FSImageSerialization.readString_EmptyAsNull(in);
       this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
     }
-
   }
 
   static class SetNSQuotaOp extends FSEditLogOp {
@@ -373,7 +731,18 @@ public abstract class FSEditLogOp {
       super(OP_SET_NS_QUOTA);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetNSQuotaOp getInstance() {
+      return (SetNSQuotaOp)opInstances.get()
+        .get(OP_SET_NS_QUOTA);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated");      
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.nsQuota = readLongWritable(in);
@@ -387,7 +756,18 @@ public abstract class FSEditLogOp {
       super(OP_CLEAR_NS_QUOTA);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static ClearNSQuotaOp getInstance() {
+      return (ClearNSQuotaOp)opInstances.get()
+        .get(OP_CLEAR_NS_QUOTA);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated");      
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
     }
@@ -402,7 +782,35 @@ public abstract class FSEditLogOp {
       super(OP_SET_QUOTA);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetQuotaOp getInstance() {
+      return (SetQuotaOp)opInstances.get()
+        .get(OP_SET_QUOTA);
+    }
+
+    SetQuotaOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetQuotaOp setNSQuota(long nsQuota) {
+      this.nsQuota = nsQuota;
+      return this;
+    }
+
+    SetQuotaOp setDSQuota(long dsQuota) {
+      this.dsQuota = dsQuota;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(src).write(out);
+      new LongWritable(nsQuota).write(out);
+      new LongWritable(dsQuota).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.nsQuota = readLongWritable(in);
@@ -420,7 +828,37 @@ public abstract class FSEditLogOp {
       super(OP_TIMES);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static TimesOp getInstance() {
+      return (TimesOp)opInstances.get()
+        .get(OP_TIMES);
+    }
+
+    TimesOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    TimesOp setModificationTime(long mtime) {
+      this.mtime = mtime;
+      return this;
+    }
+
+    TimesOp setAccessTime(long atime) {
+      this.atime = atime;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path),
+        toLogLong(mtime),
+        toLogLong(atime)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (length != 3) {
@@ -445,7 +883,49 @@ public abstract class FSEditLogOp {
       super(OP_SYMLINK);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SymlinkOp getInstance() {
+      return (SymlinkOp)opInstances.get()
+        .get(OP_SYMLINK);
+    }
+
+    SymlinkOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    SymlinkOp setValue(String value) {
+      this.value = value;
+      return this;
+    }
+
+    SymlinkOp setModificationTime(long mtime) {
+      this.mtime = mtime;
+      return this;
+    }
+
+    SymlinkOp setAccessTime(long atime) {
+      this.atime = atime;
+      return this;
+    }
+
+    SymlinkOp setPermissionStatus(PermissionStatus permissionStatus) {
+      this.permissionStatus = permissionStatus;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path),
+        new DeprecatedUTF8(value),
+        toLogLong(mtime),
+        toLogLong(atime)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      permissionStatus.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
 
       this.length = in.readInt();
@@ -472,7 +952,43 @@ public abstract class FSEditLogOp {
       super(OP_RENAME);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static RenameOp getInstance() {
+      return (RenameOp)opInstances.get()
+        .get(OP_RENAME);
+    }
+
+    RenameOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    RenameOp setDestination(String dst) {
+      this.dst = dst;
+      return this;
+    }
+    
+    RenameOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+    
+    RenameOp setOptions(Rename[] options) {
+      this.options = options;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(src),
+        new DeprecatedUTF8(dst),
+        toLogLong(timestamp)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      toBytesWritable(options).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (this.length != 3) {
@@ -497,6 +1013,14 @@ public abstract class FSEditLogOp {
       }
       return options;
     }
+
+    static BytesWritable toBytesWritable(Rename... options) {
+      byte[] bytes = new byte[options.length];
+      for (int i = 0; i < options.length; i++) {
+        bytes[i] = options[i].value();
+      }
+      return new BytesWritable(bytes);
+    }
   }
 
   static class ReassignLeaseOp extends FSEditLogOp {
@@ -507,8 +1031,36 @@ public abstract class FSEditLogOp {
     private ReassignLeaseOp() {
       super(OP_REASSIGN_LEASE);
     }
-    
-    public void readFields(DataInputStream in, int logVersion)
+
+    static ReassignLeaseOp getInstance() {
+      return (ReassignLeaseOp)opInstances.get()
+        .get(OP_REASSIGN_LEASE);
+    }
+
+    ReassignLeaseOp setLeaseHolder(String leaseHolder) {
+      this.leaseHolder = leaseHolder;
+      return this;
+    }
+
+    ReassignLeaseOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    ReassignLeaseOp setNewHolder(String newHolder) {
+      this.newHolder = newHolder;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(leaseHolder).write(out);
+      new DeprecatedUTF8(path).write(out);
+      new DeprecatedUTF8(newHolder).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.leaseHolder = FSImageSerialization.readString(in);
       this.path = FSImageSerialization.readString(in);
@@ -524,7 +1076,30 @@ public abstract class FSEditLogOp {
       super(OP_GET_DELEGATION_TOKEN);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static GetDelegationTokenOp getInstance() {
+      return (GetDelegationTokenOp)opInstances.get()
+        .get(OP_GET_DELEGATION_TOKEN);
+    }
+
+    GetDelegationTokenOp setDelegationTokenIdentifier(
+        DelegationTokenIdentifier token) {
+      this.token = token;
+      return this;
+    }
+
+    GetDelegationTokenOp setExpiryTime(long expiryTime) {
+      this.expiryTime = expiryTime;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      token.write(out);
+      toLogLong(expiryTime).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
@@ -540,7 +1115,30 @@ public abstract class FSEditLogOp {
       super(OP_RENEW_DELEGATION_TOKEN);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static RenewDelegationTokenOp getInstance() {
+      return (RenewDelegationTokenOp)opInstances.get()
+          .get(OP_RENEW_DELEGATION_TOKEN);
+    }
+
+    RenewDelegationTokenOp setDelegationTokenIdentifier(
+        DelegationTokenIdentifier token) {
+      this.token = token;
+      return this;
+    }
+
+    RenewDelegationTokenOp setExpiryTime(long expiryTime) {
+      this.expiryTime = expiryTime;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      token.write(out);
+      toLogLong(expiryTime).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
@@ -555,7 +1153,24 @@ public abstract class FSEditLogOp {
       super(OP_CANCEL_DELEGATION_TOKEN);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static CancelDelegationTokenOp getInstance() {
+      return (CancelDelegationTokenOp)opInstances.get()
+          .get(OP_CANCEL_DELEGATION_TOKEN);
+    }
+
+    CancelDelegationTokenOp setDelegationTokenIdentifier(
+        DelegationTokenIdentifier token) {
+      this.token = token;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      token.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
@@ -569,13 +1184,97 @@ public abstract class FSEditLogOp {
       super(OP_UPDATE_MASTER_KEY);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static UpdateMasterKeyOp getInstance() {
+      return (UpdateMasterKeyOp)opInstances.get()
+          .get(OP_UPDATE_MASTER_KEY);
+    }
+
+    UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
+      this.key = key;
+      return this;
+    }
+    
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      key.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.key = new DelegationKey();
       this.key.readFields(in);
     }
   }
-  
+
+  static class InvalidOp extends FSEditLogOp {
+    private InvalidOp() {
+      super(OP_INVALID);
+    }
+
+    static InvalidOp getInstance() {
+      return (InvalidOp)opInstances.get().get(OP_INVALID);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      // nothing to read
+    }
+  }
+
+  static class JSpoolStartOp extends FSEditLogOp {
+    private JSpoolStartOp() {
+      super(OP_JSPOOL_START);
+    }
+
+    static JSpoolStartOp getInstance() {
+      return (JSpoolStartOp)opInstances.get().get(OP_JSPOOL_START);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+    }
+  }
+
+  static class CheckpointTimeOp extends FSEditLogOp {
+    long checkpointTime;
+
+    private CheckpointTimeOp() {
+      super(OP_CHECKPOINT_TIME);            
+    }
+    
+    CheckpointTimeOp setCheckpointTime(long time) {
+      this.checkpointTime = time;
+      return this;
+    }
+
+    static CheckpointTimeOp getInstance() {
+      return (CheckpointTimeOp)opInstances.get()
+        .get(OP_CHECKPOINT_TIME);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new LongWritable(checkpointTime).write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.checkpointTime = readLong(in);
+    }
+  }
+
   static private short readShort(DataInputStream in) throws IOException {
     return Short.parseShort(FSImageSerialization.readString(in));
   }
@@ -584,6 +1283,14 @@ public abstract class FSEditLogOp {
     return Long.parseLong(FSImageSerialization.readString(in));
   }
 
+  static private DeprecatedUTF8 toLogReplication(short replication) {
+    return new DeprecatedUTF8(Short.toString(replication));
+  }
+  
+  static private DeprecatedUTF8 toLogLong(long timestamp) {
+    return new DeprecatedUTF8(Long.toString(timestamp));
+  }
+
   /**
    * A class to read in blocks stored in the old format. The only two
    * fields in the block were blockid and length.
@@ -631,13 +1338,36 @@ public abstract class FSEditLogOp {
   }
 
   /**
+   * Class for writing editlog ops
+   */
+  public static class Writer {
+    private final DataOutputStream out;
+
+    public Writer(DataOutputStream out) {
+      this.out = out;
+    }
+
+    /**
+     * Write an operation to the output stream
+     * 
+     * @param op The operation to write
+     * @throws IOException if an error occurs during writing.
+     */
+    public void writeOp(FSEditLogOp op) throws IOException {
+      out.writeByte(op.opCode.getOpCode());
+      
+      op.writeFields(out);
+    }
+  }
+
+  /**
    * Class for reading editlog ops from a stream
    */
   public static class Reader {
     private final DataInputStream in;
     private final int logVersion;
     private final Checksum checksum;
-    private EnumMap<FSEditLogOpCodes, FSEditLogOp> opInstances;
+
     /**
      * Construct the reader
      * @param in The stream to read from.
@@ -650,32 +1380,6 @@ public abstract class FSEditLogOp {
       this.in = in;
       this.logVersion = logVersion;
       this.checksum = checksum;
-      opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
-          FSEditLogOpCodes.class);
-      opInstances.put(OP_ADD, new AddCloseOp(OP_ADD));
-      opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE));
-      opInstances.put(OP_SET_REPLICATION, new SetReplicationOp());
-      opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
-      opInstances.put(OP_RENAME_OLD, new RenameOldOp());
-      opInstances.put(OP_DELETE, new DeleteOp());
-      opInstances.put(OP_MKDIR, new MkdirOp());
-      opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp());
-      opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp());
-      opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
-      opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
-      opInstances.put(OP_SET_OWNER, new SetOwnerOp());
-      opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
-      opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
-      opInstances.put(OP_SET_QUOTA, new SetQuotaOp());
-      opInstances.put(OP_TIMES, new TimesOp());
-      opInstances.put(OP_SYMLINK, new SymlinkOp());
-      opInstances.put(OP_RENAME, new RenameOp());
-      opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
-      opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
-      opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
-      opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
-                      new CancelDelegationTokenOp());
-      opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
     }
 
     /**
@@ -708,7 +1412,7 @@ public abstract class FSEditLogOp {
         return null;
       }
 
-      FSEditLogOp op = opInstances.get(opCode);
+      FSEditLogOp op = opInstances.get().get(opCode);
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }