You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 09:10:53 UTC

svn commit: r1152128 [2/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/...

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Fri Jul 29 07:10:48 2011
@@ -107,7 +107,7 @@ public class BackupImage extends FSImage
       StorageDirectory sd = it.next();
       StorageState curState;
       try {
-        curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
+        curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR, storage);
         // sd is locked but not opened
         switch(curState) {
         case NON_EXISTENT:
@@ -126,7 +126,8 @@ public class BackupImage extends FSImage
           sd.doRecover(curState);
         }
         if(curState != StorageState.NOT_FORMATTED) {
-          sd.read(); // read and verify consistency with other directories
+          // read and verify consistency with other directories
+          storage.readProperties(sd);
         }
       } catch(IOException ioe) {
         sd.unlock();

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Jul 29 07:10:48 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -27,7 +26,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 
@@ -45,26 +43,9 @@ class EditLogBackupOutputStream extends 
   private JournalProtocol backupNode;        // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
-  private ArrayList<JournalRecord> bufCurrent;  // current buffer for writing
-  private ArrayList<JournalRecord> bufReady;    // buffer ready for flushing
+  private EditsDoubleBuffer doubleBuf;
   private DataOutputBuffer out;     // serialized output sent to backup node
 
-  static class JournalRecord {
-    byte op;
-    long txid;
-    Writable[] args;
-
-    JournalRecord(byte op, long txid, Writable ... writables) {
-      this.op = op;
-      this.txid = txid;
-      this.args = writables;
-    }
-
-    void write(DataOutputBuffer out) throws IOException {
-      writeChecksummedOp(out, op, txid, args);
-    }
-  }
-
   EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
                             NamenodeRegistration nnReg) // active name-node
   throws IOException {
@@ -82,8 +63,7 @@ class EditLogBackupOutputStream extends 
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
     }
-    this.bufCurrent = new ArrayList<JournalRecord>();
-    this.bufReady = new ArrayList<JournalRecord>();
+    this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
   
@@ -97,14 +77,14 @@ class EditLogBackupOutputStream extends 
     return JournalType.BACKUP;
   }
 
-  @Override
-  void write(byte[] data, int i, int length) throws IOException {
-    throw new IOException("Not implemented");
-  }
-
   @Override // EditLogOutputStream
-  void write(byte op, long txid, Writable ... writables) throws IOException {
-    bufCurrent.add(new JournalRecord(op, txid, writables));
+  void write(FSEditLogOp op) throws IOException {
+    doubleBuf.writeOp(op);
+ }
+
+  @Override
+  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    throw new IOException("Not supported");
   }
 
   /**
@@ -112,51 +92,53 @@ class EditLogBackupOutputStream extends 
    */
   @Override // EditLogOutputStream
   void create() throws IOException {
-    bufCurrent.clear();
-    assert bufReady.size() == 0 : "previous data is not flushed yet";
+    assert doubleBuf.isFlushed() : "previous data is not flushed yet";
+    this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
   }
 
   @Override // EditLogOutputStream
   public void close() throws IOException {
     // close should have been called after all pending transactions 
     // have been flushed & synced.
-    int size = bufCurrent.size();
+    int size = doubleBuf.countBufferedBytes();
     if (size != 0) {
       throw new IOException("BackupEditStream has " + size +
                           " records still to be flushed and cannot be closed.");
     } 
     RPC.stopProxy(backupNode); // stop the RPC threads
-    bufCurrent = bufReady = null;
+    doubleBuf.close();
+    doubleBuf = null;
   }
 
   @Override
   public void abort() throws IOException {
     RPC.stopProxy(backupNode);
-    bufCurrent = bufReady = null;
+    doubleBuf = null;
   }
 
   @Override // EditLogOutputStream
   void setReadyToFlush() throws IOException {
-    assert bufReady.size() == 0 : "previous data is not flushed yet";
-    ArrayList<JournalRecord>  tmp = bufReady;
-    bufReady = bufCurrent;
-    bufCurrent = tmp;
+    doubleBuf.setReadyToFlush();
   }
 
   @Override // EditLogOutputStream
   protected void flushAndSync() throws IOException {
-    assert out.size() == 0 : "Output buffer is not empty";
-    for (JournalRecord jRec : bufReady) {
-      jRec.write(out);
-    }
-    if (out.size() > 0) {
+    assert out.getLength() == 0 : "Output buffer is not empty";
+    
+    int numReadyTxns = doubleBuf.countReadyTxns();
+    long firstTxToFlush = doubleBuf.getFirstReadyTxId();
+    
+    doubleBuf.flushTo(out);
+    if (out.getLength() > 0) {
+      assert numReadyTxns > 0;
+      
       byte[] data = Arrays.copyOf(out.getData(), out.getLength());
+      out.reset();
+      assert out.getLength() == 0 : "Output buffer is not empty";
+
       backupNode.journal(nnRegistration,
-          bufReady.get(0).txid, bufReady.size(),
-          data);
+          firstTxToFlush, numReadyTxns, data);
     }
-    bufReady.clear();         // erase all data in the buffer
-    out.reset();              // reset buffer to the start position
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Jul 29 07:10:48 2011
@@ -28,9 +28,7 @@ import java.nio.channels.FileChannel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -46,10 +44,7 @@ class EditLogFileOutputStream extends Ed
   private File file;
   private FileOutputStream fp; // file stream for storing edit logs
   private FileChannel fc; // channel of the file stream for sync
-  private DataOutputBuffer bufCurrent; // current buffer for writing
-  private DataOutputBuffer bufReady; // buffer ready for flushing
-  final private int initBufferSize; // inital buffer size
-
+  private EditsDoubleBuffer doubleBuf;
   static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
 
   static {
@@ -71,9 +66,7 @@ class EditLogFileOutputStream extends Ed
   EditLogFileOutputStream(File name, int size) throws IOException {
     super();
     file = name;
-    initBufferSize = size;
-    bufCurrent = new DataOutputBuffer(size);
-    bufReady = new DataOutputBuffer(size);
+    doubleBuf = new EditsDoubleBuffer(size);
     RandomAccessFile rp = new RandomAccessFile(name, "rw");
     fp = new FileOutputStream(rp.getFD()); // open for append
     fc = rp.getChannel();
@@ -90,15 +83,10 @@ class EditLogFileOutputStream extends Ed
     return JournalType.FILE;
   }
 
-  /**
-   * Write a single byte to the output stream.
-   * @param b the byte to write
-   */
-  private void write(int b) throws IOException {
-    if (fp == null) {
-      throw new IOException("Trying to use aborted output stream");
-    }
-    bufCurrent.write(b);
+  /** {@inheritDoc} */
+  @Override
+  void write(FSEditLogOp op) throws IOException {
+    doubleBuf.writeOp(op);
   }
 
   /**
@@ -110,16 +98,8 @@ class EditLogFileOutputStream extends Ed
    * </ul>
    * */
   @Override
-  void write(byte op, long txid, Writable... writables) throws IOException {
-    if (fp == null) {
-      throw new IOException("Trying to use aborted output stream");
-    }
-    writeChecksummedOp(bufCurrent, op, txid, writables);
-  }
-
-  @Override
-  void write(final byte[] data, int off, int len) throws IOException {
-    bufCurrent.write(data, off, len);
+  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    doubleBuf.writeRaw(bytes, offset, length);
   }
 
   /**
@@ -129,7 +109,7 @@ class EditLogFileOutputStream extends Ed
   void create() throws IOException {
     fc.truncate(0);
     fc.position(0);
-    bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
+    doubleBuf.getCurrentBuf().writeInt(FSConstants.LAYOUT_VERSION);
     setReadyToFlush();
     flush();
   }
@@ -144,22 +124,11 @@ class EditLogFileOutputStream extends Ed
       // close should have been called after all pending transactions
       // have been flushed & synced.
       // if already closed, just skip
-      if(bufCurrent != null)
-      {
-        int bufSize = bufCurrent.size();
-        if (bufSize != 0) {
-          throw new IOException("FSEditStream has " + bufSize
-              + " bytes still to be flushed and cannot " + "be closed.");
-        }
-        bufCurrent.close();
-        bufCurrent = null;
-      }
-  
-      if(bufReady != null) {
-        bufReady.close();
-        bufReady = null;
+      if (doubleBuf != null) {
+        doubleBuf.close();
+        doubleBuf = null;
       }
-  
+      
       // remove the last INVALID marker from transaction log.
       if (fc != null && fc.isOpen()) {
         fc.truncate(fc.position());
@@ -171,8 +140,8 @@ class EditLogFileOutputStream extends Ed
         fp = null;
       }
     } finally {
-      IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
-      bufCurrent = bufReady = null;
+      IOUtils.cleanup(FSNamesystem.LOG, fc, fp);
+      doubleBuf = null;
       fc = null;
       fp = null;
     }
@@ -194,11 +163,8 @@ class EditLogFileOutputStream extends Ed
    */
   @Override
   void setReadyToFlush() throws IOException {
-    assert bufReady.size() == 0 : "previous data is not flushed yet";
-    write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
-    DataOutputBuffer tmp = bufReady;
-    bufReady = bufCurrent;
-    bufCurrent = tmp;
+    doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
+    doubleBuf.setReadyToFlush();
   }
 
   /**
@@ -212,8 +178,7 @@ class EditLogFileOutputStream extends Ed
     }
     
     preallocate(); // preallocate file if necessary
-    bufReady.writeTo(fp); // write data to file
-    bufReady.reset(); // erase all data in the buffer
+    doubleBuf.flushTo(fp);
     fc.force(false); // metadata updates not needed because of preallocation
     fc.position(fc.position() - 1); // skip back the end-of-file marker
   }
@@ -223,7 +188,7 @@ class EditLogFileOutputStream extends Ed
    */
   @Override
   public boolean shouldForceSync() {
-    return bufReady.size() >= initBufferSize;
+    return doubleBuf.shouldForceSync();
   }
   
   /**
@@ -232,8 +197,8 @@ class EditLogFileOutputStream extends Ed
   @Override
   long length() throws IOException {
     // file size - header size + size of both buffers
-    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + bufReady.size()
-        + bufCurrent.size();
+    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + 
+      doubleBuf.countBufferedBytes();
   }
 
   // allocate a big chunk of data

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Jul 29 07:10:48 2011
@@ -39,26 +39,27 @@ abstract class EditLogOutputStream imple
   }
 
   /**
-   * Write edits log record into the stream.
-   * The record is represented by operation name and
-   * an array of Writable arguments.
+   * Write edits log operation to the stream.
    * 
    * @param op operation
-   * @param txid the transaction ID of this operation
-   * @param writables array of Writable arguments
    * @throws IOException
    */
-  abstract void write(byte op, long txid, Writable ... writables)
-  throws IOException;
-  
+  abstract void write(FSEditLogOp op) throws IOException;
+
   /**
    * Write raw data to an edit log. This data should already have
    * the transaction ID, checksum, etc included. It is for use
    * within the BackupNode when replicating edits from the
    * NameNode.
+   *
+   * @param bytes the bytes to write.
+   * @param offset offset in the bytes to write from
+   * @param length number of bytes to write
+   * @throws IOException
    */
-  abstract void write(byte[] data, int offset, int length) throws IOException;
-  
+  abstract void writeRaw(byte[] bytes, int offset, int length)
+      throws IOException;
+
   /**
    * Create and initialize underlying persistent edits log storage.
    * 
@@ -139,26 +140,4 @@ abstract class EditLogOutputStream imple
   public String toString() {
     return getName();
   }
-
-  /**
-   * Write the given operation to the specified buffer, including
-   * the transaction ID and checksum.
-   */
-  protected static void writeChecksummedOp(
-      DataOutputBuffer buf, byte op, long txid, Writable... writables)
-      throws IOException {
-    int start = buf.getLength();
-    buf.write(op);
-    buf.writeLong(txid);
-    for (Writable w : writables) {
-      w.write(buf);
-    }
-    // write transaction checksum
-    int end = buf.getLength();
-    Checksum checksum = FSEditLog.getChecksum();
-    checksum.reset();
-    checksum.update(buf.getData(), start, end-start);
-    int sum = (int)checksum.getValue();
-    buf.writeInt(sum);
-  }
 }

Copied: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (from r1151750, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?p2=hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java&p1=hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java&r1=1151750&r2=1152128&rev=1152128&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Jul 29 07:10:48 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
@@ -35,20 +36,19 @@ import com.google.common.base.Preconditi
  */
 class EditsDoubleBuffer {
 
-  private DataOutputBuffer bufCurrent; // current buffer for writing
-  private DataOutputBuffer bufReady; // buffer ready for flushing
+  private TxnBuffer bufCurrent; // current buffer for writing
+  private TxnBuffer bufReady; // buffer ready for flushing
   private final int initBufferSize;
-  private Writer writer;
 
   public EditsDoubleBuffer(int defaultBufferSize) {
     initBufferSize = defaultBufferSize;
-    bufCurrent = new DataOutputBuffer(initBufferSize);
-    bufReady = new DataOutputBuffer(initBufferSize);
-    writer = new FSEditLogOp.Writer(bufCurrent);
+    bufCurrent = new TxnBuffer(initBufferSize);
+    bufReady = new TxnBuffer(initBufferSize);
+
   }
     
   public void writeOp(FSEditLogOp op) throws IOException {
-    writer.writeOp(op);
+    bufCurrent.writeOp(op);
   }
 
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
@@ -71,10 +71,9 @@ class EditsDoubleBuffer {
   
   void setReadyToFlush() {
     assert isFlushed() : "previous data not flushed yet";
-    DataOutputBuffer tmp = bufReady;
+    TxnBuffer tmp = bufReady;
     bufReady = bufCurrent;
     bufCurrent = tmp;
-    writer = new FSEditLogOp.Writer(bufCurrent);
   }
   
   /**
@@ -102,4 +101,50 @@ class EditsDoubleBuffer {
     return bufReady.size() + bufCurrent.size();
   }
 
+  /**
+   * @return the transaction ID of the first transaction ready to be flushed 
+   */
+  public long getFirstReadyTxId() {
+    assert bufReady.firstTxId > 0;
+    return bufReady.firstTxId;
+  }
+
+  /**
+   * @return the number of transactions that are ready to be flushed
+   */
+  public int countReadyTxns() {
+    return bufReady.numTxns;
+  }
+
+  
+  private static class TxnBuffer extends DataOutputBuffer {
+    long firstTxId;
+    int numTxns;
+    private Writer writer;
+    
+    public TxnBuffer(int initBufferSize) {
+      super(initBufferSize);
+      writer = new FSEditLogOp.Writer(this);
+      reset();
+    }
+
+    public void writeOp(FSEditLogOp op) throws IOException {
+      if (firstTxId == FSConstants.INVALID_TXID) {
+        firstTxId = op.txid;
+      } else {
+        assert op.txid > firstTxId;
+      }
+      writer.writeOp(op);
+      numTxns++;
+    }
+    
+    @Override
+    public DataOutputBuffer reset() {
+      super.reset();
+      firstTxId = FSConstants.INVALID_TXID;
+      numTxns = 0;
+      return this;
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 29 07:10:48 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.zip.Checksum;
+import java.util.zip.CheckedOutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,7 +29,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -52,6 +51,7 @@ import com.google.common.base.Preconditi
 import com.google.common.collect.Lists;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -207,7 +207,7 @@ public class FSEditLog  {
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  void logEdit(final FSEditLogOpCodes opCode, final Writable ... writables) {
+  void logEdit(final FSEditLogOp op) {
     synchronized (this) {
       assert state != State.CLOSED;
       
@@ -219,12 +219,13 @@ public class FSEditLog  {
       }
       
       long start = beginTransaction();
+      op.setTransactionId(txid);
 
       mapJournalsAndReportErrors(new JournalClosure() {
         @Override 
         public void apply(JournalAndStream jas) throws IOException {
           if (!jas.isActive()) return;
-          jas.stream.write(opCode.getOpCode(), txid, writables);
+          jas.stream.write(op);
         }
       }, "logging edit");
 
@@ -520,49 +521,45 @@ public class FSEditLog  {
    * Records the block locations of the last block.
    */
   public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
-
-    DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(path), 
-      FSEditLog.toLogReplication(newNode.getReplication()),
-      FSEditLog.toLogLong(newNode.getModificationTime()),
-      FSEditLog.toLogLong(newNode.getAccessTime()),
-      FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
-    logEdit(OP_ADD,
-            new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), 
-            new ArrayWritable(Block.class, newNode.getBlocks()),
-            newNode.getPermissionStatus(),
-            new DeprecatedUTF8(newNode.getClientName()),
-            new DeprecatedUTF8(newNode.getClientMachine()));
+    AddOp op = AddOp.getInstance()
+      .setPath(path)
+      .setReplication(newNode.getReplication())
+      .setModificationTime(newNode.getModificationTime())
+      .setAccessTime(newNode.getAccessTime())
+      .setBlockSize(newNode.getPreferredBlockSize())
+      .setBlocks(newNode.getBlocks())
+      .setPermissionStatus(newNode.getPermissionStatus())
+      .setClientName(newNode.getClientName())
+      .setClientMachine(newNode.getClientMachine());
+    
+      logEdit(op);
   }
 
   /** 
    * Add close lease record to edit log.
    */
   public void logCloseFile(String path, INodeFile newNode) {
-    DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
-      new DeprecatedUTF8(path),
-      FSEditLog.toLogReplication(newNode.getReplication()),
-      FSEditLog.toLogLong(newNode.getModificationTime()),
-      FSEditLog.toLogLong(newNode.getAccessTime()),
-      FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
-    logEdit(OP_CLOSE,
-            new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
-            new ArrayWritable(Block.class, newNode.getBlocks()),
-            newNode.getPermissionStatus());
+    CloseOp op = CloseOp.getInstance()
+      .setPath(path)
+      .setReplication(newNode.getReplication())
+      .setModificationTime(newNode.getModificationTime())
+      .setAccessTime(newNode.getAccessTime())
+      .setBlockSize(newNode.getPreferredBlockSize())
+      .setBlocks(newNode.getBlocks())
+      .setPermissionStatus(newNode.getPermissionStatus());
+    
+    logEdit(op);
   }
   
   /** 
    * Add create directory record to edit log
    */
   public void logMkDir(String path, INode newNode) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
-      new DeprecatedUTF8(path),
-      FSEditLog.toLogLong(newNode.getModificationTime()),
-      FSEditLog.toLogLong(newNode.getAccessTime())
-    };
-    logEdit(OP_MKDIR,
-      new ArrayWritable(DeprecatedUTF8.class, info),
-      newNode.getPermissionStatus());
+    MkdirOp op = MkdirOp.getInstance()
+      .setPath(path)
+      .setTimestamp(newNode.getModificationTime())
+      .setPermissionStatus(newNode.getPermissionStatus());
+    logEdit(op);
   }
   
   /** 
@@ -570,33 +567,33 @@ public class FSEditLog  {
    * TODO: use String parameters until just before writing to disk
    */
   void logRename(String src, String dst, long timestamp) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      new DeprecatedUTF8(dst),
-      FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+    RenameOldOp op = RenameOldOp.getInstance()
+      .setSource(src)
+      .setDestination(dst)
+      .setTimestamp(timestamp);
+    logEdit(op);
   }
   
   /** 
    * Add rename record to edit log
    */
   void logRename(String src, String dst, long timestamp, Options.Rename... options) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      new DeprecatedUTF8(dst),
-      FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME,
-      new ArrayWritable(DeprecatedUTF8.class, info),
-      toBytesWritable(options));
+    RenameOp op = RenameOp.getInstance()
+      .setSource(src)
+      .setDestination(dst)
+      .setTimestamp(timestamp)
+      .setOptions(options);
+    logEdit(op);
   }
   
   /** 
    * Add set replication record to edit log
    */
   void logSetReplication(String src, short replication) {
-    logEdit(OP_SET_REPLICATION, 
-      new DeprecatedUTF8(src), 
-      FSEditLog.toLogReplication(replication));
+    SetReplicationOp op = SetReplicationOp.getInstance()
+      .setPath(src)
+      .setReplication(replication);
+    logEdit(op);
   }
   
   /** Add set namespace quota record to edit log
@@ -605,64 +602,69 @@ public class FSEditLog  {
    * @param quota the directory size limit
    */
   void logSetQuota(String src, long nsQuota, long dsQuota) {
-    logEdit(OP_SET_QUOTA,
-      new DeprecatedUTF8(src), 
-      new LongWritable(nsQuota), new LongWritable(dsQuota));
+    SetQuotaOp op = SetQuotaOp.getInstance()
+      .setSource(src)
+      .setNSQuota(nsQuota)
+      .setDSQuota(dsQuota);
+    logEdit(op);
   }
 
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
-    logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
+    SetPermissionsOp op = SetPermissionsOp.getInstance()
+      .setSource(src)
+      .setPermissions(permissions);
+    logEdit(op);
   }
 
   /**  Add set owner record to edit log */
   void logSetOwner(String src, String username, String groupname) {
-    DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
-    DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
-    logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
+    SetOwnerOp op = SetOwnerOp.getInstance()
+      .setSource(src)
+      .setUser(username)
+      .setGroup(groupname);
+    logEdit(op);
   }
   
   /**
    * concat(trg,src..) log
    */
   void logConcat(String trg, String [] srcs, long timestamp) {
-    int size = 1 + srcs.length + 1; // trg, srcs, timestamp
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
-    int idx = 0;
-    info[idx++] = new DeprecatedUTF8(trg);
-    for(int i=0; i<srcs.length; i++) {
-      info[idx++] = new DeprecatedUTF8(srcs[i]);
-    }
-    info[idx] = FSEditLog.toLogLong(timestamp);
-    logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+    ConcatDeleteOp op = ConcatDeleteOp.getInstance()
+      .setTarget(trg)
+      .setSources(srcs)
+      .setTimestamp(timestamp);
+    logEdit(op);
   }
   
   /** 
    * Add delete file record to edit log
    */
   void logDelete(String src, long timestamp) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+    DeleteOp op = DeleteOp.getInstance()
+      .setPath(src)
+      .setTimestamp(timestamp);
+    logEdit(op);
   }
 
   /** 
    * Add generation stamp record to edit log
    */
   void logGenerationStamp(long genstamp) {
-    logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
+    SetGenstampOp op = SetGenstampOp.getInstance()
+      .setGenerationStamp(genstamp);
+    logEdit(op);
   }
 
   /** 
    * Add access time record to edit log
    */
   void logTimes(String src, long mtime, long atime) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(src),
-      FSEditLog.toLogLong(mtime),
-      FSEditLog.toLogLong(atime)};
-    logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
+    TimesOp op = TimesOp.getInstance()
+      .setPath(src)
+      .setModificationTime(mtime)
+      .setAccessTime(atime);
+    logEdit(op);
   }
 
   /** 
@@ -670,14 +672,13 @@ public class FSEditLog  {
    */
   void logSymlink(String path, String value, long mtime, 
                   long atime, INodeSymlink node) {
-    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
-      new DeprecatedUTF8(path),
-      new DeprecatedUTF8(value),
-      FSEditLog.toLogLong(mtime),
-      FSEditLog.toLogLong(atime)};
-    logEdit(OP_SYMLINK, 
-      new ArrayWritable(DeprecatedUTF8.class, info),
-      node.getPermissionStatus());
+    SymlinkOp op = SymlinkOp.getInstance()
+      .setPath(path)
+      .setValue(value)
+      .setModificationTime(mtime)
+      .setAccessTime(atime)
+      .setPermissionStatus(node.getPermissionStatus());
+    logEdit(op);
   }
   
   /**
@@ -688,36 +689,40 @@ public class FSEditLog  {
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+    GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
+      .setDelegationTokenIdentifier(id)
+      .setExpiryTime(expiryTime);
+    logEdit(op);
   }
   
   void logRenewDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
-    logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+    RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
+      .setDelegationTokenIdentifier(id)
+      .setExpiryTime(expiryTime);
+    logEdit(op);
   }
   
   void logCancelDelegationToken(DelegationTokenIdentifier id) {
-    logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+    CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
+      .setDelegationTokenIdentifier(id);
+    logEdit(op);
   }
   
   void logUpdateMasterKey(DelegationKey key) {
-    logEdit(OP_UPDATE_MASTER_KEY, key);
+    UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
+      .setDelegationKey(key);
+    logEdit(op);
   }
 
   void logReassignLease(String leaseHolder, String src, String newHolder) {
-    logEdit(OP_REASSIGN_LEASE, new DeprecatedUTF8(leaseHolder),
-        new DeprecatedUTF8(src),
-        new DeprecatedUTF8(newHolder));
-  }
-  
-  static private DeprecatedUTF8 toLogReplication(short replication) {
-    return new DeprecatedUTF8(Short.toString(replication));
+    ReassignLeaseOp op = ReassignLeaseOp.getInstance()
+      .setLeaseHolder(leaseHolder)
+      .setPath(src)
+      .setNewHolder(newHolder);
+    logEdit(op);
   }
   
-  static private DeprecatedUTF8 toLogLong(long timestamp) {
-    return new DeprecatedUTF8(Long.toString(timestamp));
-  }
-
   /**
    * @return the number of active (non-failed) journals
    */
@@ -818,7 +823,8 @@ public class FSEditLog  {
     state = State.IN_SEGMENT;
 
     if (writeHeaderTxn) {
-      logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
+      logEdit(LogSegmentOp.getInstance(
+          FSEditLogOpCodes.OP_START_LOG_SEGMENT));
       logSync();
     }
   }
@@ -833,7 +839,8 @@ public class FSEditLog  {
         "Bad state: %s", state);
     
     if (writeEndTxn) {
-      logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+      logEdit(LogSegmentOp.getInstance(
+          FSEditLogOpCodes.OP_END_LOG_SEGMENT));
       logSync();
     }
 
@@ -992,7 +999,7 @@ public class FSEditLog  {
       @Override
       public void apply(JournalAndStream jas) throws IOException {
         if (jas.isActive()) {
-          jas.getCurrentStream().write(data, 0, length);
+          jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
         }
       }      
     }, "Logging edit");
@@ -1000,14 +1007,6 @@ public class FSEditLog  {
     endTransaction(start);
   }
 
-  static BytesWritable toBytesWritable(Options.Rename... options) {
-    byte[] bytes = new byte[options.length];
-    for (int i = 0; i < options.length; i++) {
-      bytes[i] = options[i].value();
-    }
-    return new BytesWritable(bytes);
-  }
-  
   //// Iteration across journals
   private interface JournalClosure {
     public void apply(JournalAndStream jas) throws IOException;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Jul 29 07:10:48 2011
@@ -40,14 +40,18 @@ import org.apache.hadoop.hdfs.server.com
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.EOFException;
 
@@ -63,6 +67,45 @@ public abstract class FSEditLogOp {
   long txid;
 
 
+  @SuppressWarnings("deprecation")
+  private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
+    new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
+      @Override
+      protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
+        EnumMap<FSEditLogOpCodes, FSEditLogOp> instances 
+          = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
+        instances.put(OP_ADD, new AddOp());
+        instances.put(OP_CLOSE, new CloseOp());
+        instances.put(OP_SET_REPLICATION, new SetReplicationOp());
+        instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+        instances.put(OP_RENAME_OLD, new RenameOldOp());
+        instances.put(OP_DELETE, new DeleteOp());
+        instances.put(OP_MKDIR, new MkdirOp());
+        instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
+        instances.put(OP_DATANODE_ADD, new DatanodeAddOp());
+        instances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
+        instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+        instances.put(OP_SET_OWNER, new SetOwnerOp());
+        instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+        instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+        instances.put(OP_SET_QUOTA, new SetQuotaOp());
+        instances.put(OP_TIMES, new TimesOp());
+        instances.put(OP_SYMLINK, new SymlinkOp());
+        instances.put(OP_RENAME, new RenameOp());
+        instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+        instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+        instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+        instances.put(OP_CANCEL_DELEGATION_TOKEN, 
+                      new CancelDelegationTokenOp());
+        instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+        instances.put(OP_START_LOG_SEGMENT,
+                      new LogSegmentOp(OP_START_LOG_SEGMENT));
+        instances.put(OP_END_LOG_SEGMENT,
+                      new LogSegmentOp(OP_END_LOG_SEGMENT));
+        return instances;
+      }
+  };
+
   /**
    * Constructor for an EditLog Op. EditLog ops cannot be constructed
    * directly, but only through Reader#readOp.
@@ -76,10 +119,14 @@ public abstract class FSEditLogOp {
     this.txid = txid;
   }
 
-  public abstract void readFields(DataInputStream in, int logVersion)
+  abstract void readFields(DataInputStream in, int logVersion)
       throws IOException;
 
-  static class AddCloseOp extends FSEditLogOp {
+  abstract void writeFields(DataOutputStream out)
+      throws IOException;
+
+  @SuppressWarnings("unchecked")
+  static abstract class AddCloseOp extends FSEditLogOp {
     int length;
     String path;
     short replication;
@@ -97,7 +144,71 @@ public abstract class FSEditLogOp {
       assert(opCode == OP_ADD || opCode == OP_CLOSE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    <T extends AddCloseOp> T setPath(String path) {
+      this.path = path;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setReplication(short replication) {
+      this.replication = replication;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setModificationTime(long mtime) {
+      this.mtime = mtime;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setAccessTime(long atime) {
+      this.atime = atime;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setBlockSize(long blockSize) {
+      this.blockSize = blockSize;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setBlocks(Block[] blocks) {
+      this.blocks = blocks;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setPermissionStatus(PermissionStatus permissions) {
+      this.permissions = permissions;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setClientName(String clientName) {
+      this.clientName = clientName;
+      return (T)this;
+    }
+
+    <T extends AddCloseOp> T setClientMachine(String clientMachine) {
+      this.clientMachine = clientMachine;
+      return (T)this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path), 
+        toLogReplication(replication),
+        toLogLong(mtime),
+        toLogLong(atime),
+        toLogLong(blockSize)};
+      new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair).write(out);
+      new ArrayWritable(Block.class, blocks).write(out);
+      permissions.write(out);
+
+      if (this.opCode == OP_ADD) {
+        new DeprecatedUTF8(clientName).write(out);
+        new DeprecatedUTF8(clientMachine).write(out);
+      }
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       // versions > 0 support per file replication
       // get name and replication
@@ -178,6 +289,26 @@ public abstract class FSEditLogOp {
     }
   }
 
+  static class AddOp extends AddCloseOp {
+    private AddOp() {
+      super(OP_ADD);
+    }
+
+    static AddOp getInstance() {
+      return (AddOp)opInstances.get().get(OP_ADD);
+    }
+  }
+
+  static class CloseOp extends AddCloseOp {
+    private CloseOp() {
+      super(OP_CLOSE);
+    }
+
+    static CloseOp getInstance() {
+      return (CloseOp)opInstances.get().get(OP_CLOSE);
+    }
+  }
+
   static class SetReplicationOp extends FSEditLogOp {
     String path;
     short replication;
@@ -186,7 +317,29 @@ public abstract class FSEditLogOp {
       super(OP_SET_REPLICATION);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetReplicationOp getInstance() {
+      return (SetReplicationOp)opInstances.get()
+        .get(OP_SET_REPLICATION);
+    }
+
+    SetReplicationOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    SetReplicationOp setReplication(short replication) {
+      this.replication = replication;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(path).write(out);
+      new DeprecatedUTF8(Short.toString(replication)).write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.path = FSImageSerialization.readString(in);
       this.replication = readShort(in);
@@ -203,7 +356,41 @@ public abstract class FSEditLogOp {
       super(OP_CONCAT_DELETE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static ConcatDeleteOp getInstance() {
+      return (ConcatDeleteOp)opInstances.get()
+        .get(OP_CONCAT_DELETE);
+    }
+
+    ConcatDeleteOp setTarget(String trg) {
+      this.trg = trg;
+      return this;
+    }
+
+    ConcatDeleteOp setSources(String[] srcs) {
+      this.srcs = srcs;
+      return this;
+    }
+
+    ConcatDeleteOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+      int idx = 0;
+      info[idx++] = new DeprecatedUTF8(trg);
+      for(int i=0; i<srcs.length; i++) {
+        info[idx++] = new DeprecatedUTF8(srcs[i]);
+      }
+      info[idx] = toLogLong(timestamp);
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (length < 3) { // trg, srcs.., timestam
@@ -230,7 +417,37 @@ public abstract class FSEditLogOp {
       super(OP_RENAME_OLD);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static RenameOldOp getInstance() {
+      return (RenameOldOp)opInstances.get()
+        .get(OP_RENAME_OLD);
+    }
+
+    RenameOldOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    RenameOldOp setDestination(String dst) {
+      this.dst = dst;
+      return this;
+    }
+
+    RenameOldOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(src),
+        new DeprecatedUTF8(dst),
+        toLogLong(timestamp)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (this.length != 3) {
@@ -252,9 +469,32 @@ public abstract class FSEditLogOp {
       super(OP_DELETE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
-        throws IOException {
+    static DeleteOp getInstance() {
+      return (DeleteOp)opInstances.get()
+        .get(OP_DELETE);
+    }
+
+    DeleteOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
 
+    DeleteOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path),
+        toLogLong(timestamp)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
       this.length = in.readInt();
       if (this.length != 2) {
         throw new IOException("Incorrect data format. "
@@ -274,8 +514,40 @@ public abstract class FSEditLogOp {
     private MkdirOp() {
       super(OP_MKDIR);
     }
+    
+    static MkdirOp getInstance() {
+      return (MkdirOp)opInstances.get()
+        .get(OP_MKDIR);
+    }
 
-    public void readFields(DataInputStream in, int logVersion)
+    MkdirOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    MkdirOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    MkdirOp setPermissionStatus(PermissionStatus permissions) {
+      this.permissions = permissions;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+        new DeprecatedUTF8(path),
+        toLogLong(timestamp), // mtime
+        toLogLong(timestamp) // atime, unused at this time
+      };
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      permissions.write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
 
       this.length = in.readInt();
@@ -309,32 +581,70 @@ public abstract class FSEditLogOp {
       super(OP_SET_GENSTAMP);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetGenstampOp getInstance() {
+      return (SetGenstampOp)opInstances.get()
+        .get(OP_SET_GENSTAMP);
+    }
+
+    SetGenstampOp setGenerationStamp(long genStamp) {
+      this.genStamp = genStamp;
+      return this;
+    }
+    
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new LongWritable(genStamp).write(out);
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.genStamp = in.readLong();
     }
   }
 
+  @SuppressWarnings("deprecation")
   static class DatanodeAddOp extends FSEditLogOp {
-    @SuppressWarnings("deprecation")
     private DatanodeAddOp() {
       super(OP_DATANODE_ADD);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static DatanodeAddOp getInstance() {
+      return (DatanodeAddOp)opInstances.get()
+        .get(OP_DATANODE_ADD);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated, should not write");
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       //Datanodes are not persistent any more.
       FSImageSerialization.DatanodeImage.skipOne(in);
     }
   }
 
+  @SuppressWarnings("deprecation")
   static class DatanodeRemoveOp extends FSEditLogOp {
-    @SuppressWarnings("deprecation")
     private DatanodeRemoveOp() {
       super(OP_DATANODE_REMOVE);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static DatanodeRemoveOp getInstance() {
+      return (DatanodeRemoveOp)opInstances.get()
+        .get(OP_DATANODE_REMOVE);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated, should not write");
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       DatanodeID nodeID = new DatanodeID();
       nodeID.readFields(in);
@@ -350,7 +660,29 @@ public abstract class FSEditLogOp {
       super(OP_SET_PERMISSIONS);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetPermissionsOp getInstance() {
+      return (SetPermissionsOp)opInstances.get()
+        .get(OP_SET_PERMISSIONS);
+    }
+
+    SetPermissionsOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetPermissionsOp setPermissions(FsPermission permissions) {
+      this.permissions = permissions;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(src).write(out);
+      permissions.write(out);
+     }
+ 
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.permissions = FsPermission.read(in);
@@ -366,13 +698,42 @@ public abstract class FSEditLogOp {
       super(OP_SET_OWNER);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetOwnerOp getInstance() {
+      return (SetOwnerOp)opInstances.get()
+        .get(OP_SET_OWNER);
+    }
+
+    SetOwnerOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetOwnerOp setUser(String username) {
+      this.username = username;
+      return this;
+    }
+
+    SetOwnerOp setGroup(String groupname) {
+      this.groupname = groupname;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
+      DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
+      new DeprecatedUTF8(src).write(out);
+      u.write(out);
+      g.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.username = FSImageSerialization.readString_EmptyAsNull(in);
       this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
     }
-
   }
 
   static class SetNSQuotaOp extends FSEditLogOp {
@@ -383,7 +744,18 @@ public abstract class FSEditLogOp {
       super(OP_SET_NS_QUOTA);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetNSQuotaOp getInstance() {
+      return (SetNSQuotaOp)opInstances.get()
+        .get(OP_SET_NS_QUOTA);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated");      
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.nsQuota = readLongWritable(in);
@@ -397,7 +769,18 @@ public abstract class FSEditLogOp {
       super(OP_CLEAR_NS_QUOTA);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static ClearNSQuotaOp getInstance() {
+      return (ClearNSQuotaOp)opInstances.get()
+        .get(OP_CLEAR_NS_QUOTA);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      throw new IOException("Deprecated");      
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
     }
@@ -412,7 +795,35 @@ public abstract class FSEditLogOp {
       super(OP_SET_QUOTA);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SetQuotaOp getInstance() {
+      return (SetQuotaOp)opInstances.get()
+        .get(OP_SET_QUOTA);
+    }
+
+    SetQuotaOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetQuotaOp setNSQuota(long nsQuota) {
+      this.nsQuota = nsQuota;
+      return this;
+    }
+
+    SetQuotaOp setDSQuota(long dsQuota) {
+      this.dsQuota = dsQuota;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(src).write(out);
+      new LongWritable(nsQuota).write(out);
+      new LongWritable(dsQuota).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.src = FSImageSerialization.readString(in);
       this.nsQuota = readLongWritable(in);
@@ -430,7 +841,37 @@ public abstract class FSEditLogOp {
       super(OP_TIMES);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static TimesOp getInstance() {
+      return (TimesOp)opInstances.get()
+        .get(OP_TIMES);
+    }
+
+    TimesOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    TimesOp setModificationTime(long mtime) {
+      this.mtime = mtime;
+      return this;
+    }
+
+    TimesOp setAccessTime(long atime) {
+      this.atime = atime;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path),
+        toLogLong(mtime),
+        toLogLong(atime)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (length != 3) {
@@ -455,7 +896,49 @@ public abstract class FSEditLogOp {
       super(OP_SYMLINK);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static SymlinkOp getInstance() {
+      return (SymlinkOp)opInstances.get()
+        .get(OP_SYMLINK);
+    }
+
+    SymlinkOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    SymlinkOp setValue(String value) {
+      this.value = value;
+      return this;
+    }
+
+    SymlinkOp setModificationTime(long mtime) {
+      this.mtime = mtime;
+      return this;
+    }
+
+    SymlinkOp setAccessTime(long atime) {
+      this.atime = atime;
+      return this;
+    }
+
+    SymlinkOp setPermissionStatus(PermissionStatus permissionStatus) {
+      this.permissionStatus = permissionStatus;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(path),
+        new DeprecatedUTF8(value),
+        toLogLong(mtime),
+        toLogLong(atime)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      permissionStatus.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
 
       this.length = in.readInt();
@@ -482,7 +965,43 @@ public abstract class FSEditLogOp {
       super(OP_RENAME);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static RenameOp getInstance() {
+      return (RenameOp)opInstances.get()
+        .get(OP_RENAME);
+    }
+
+    RenameOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    RenameOp setDestination(String dst) {
+      this.dst = dst;
+      return this;
+    }
+    
+    RenameOp setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+    
+    RenameOp setOptions(Rename[] options) {
+      this.options = options;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+        new DeprecatedUTF8(src),
+        new DeprecatedUTF8(dst),
+        toLogLong(timestamp)};
+      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+      toBytesWritable(options).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.length = in.readInt();
       if (this.length != 3) {
@@ -507,6 +1026,14 @@ public abstract class FSEditLogOp {
       }
       return options;
     }
+
+    static BytesWritable toBytesWritable(Rename... options) {
+      byte[] bytes = new byte[options.length];
+      for (int i = 0; i < options.length; i++) {
+        bytes[i] = options[i].value();
+      }
+      return new BytesWritable(bytes);
+    }
   }
 
   static class ReassignLeaseOp extends FSEditLogOp {
@@ -517,8 +1044,36 @@ public abstract class FSEditLogOp {
     private ReassignLeaseOp() {
       super(OP_REASSIGN_LEASE);
     }
-    
-    public void readFields(DataInputStream in, int logVersion)
+
+    static ReassignLeaseOp getInstance() {
+      return (ReassignLeaseOp)opInstances.get()
+        .get(OP_REASSIGN_LEASE);
+    }
+
+    ReassignLeaseOp setLeaseHolder(String leaseHolder) {
+      this.leaseHolder = leaseHolder;
+      return this;
+    }
+
+    ReassignLeaseOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    ReassignLeaseOp setNewHolder(String newHolder) {
+      this.newHolder = newHolder;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      new DeprecatedUTF8(leaseHolder).write(out);
+      new DeprecatedUTF8(path).write(out);
+      new DeprecatedUTF8(newHolder).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.leaseHolder = FSImageSerialization.readString(in);
       this.path = FSImageSerialization.readString(in);
@@ -534,7 +1089,30 @@ public abstract class FSEditLogOp {
       super(OP_GET_DELEGATION_TOKEN);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static GetDelegationTokenOp getInstance() {
+      return (GetDelegationTokenOp)opInstances.get()
+        .get(OP_GET_DELEGATION_TOKEN);
+    }
+
+    GetDelegationTokenOp setDelegationTokenIdentifier(
+        DelegationTokenIdentifier token) {
+      this.token = token;
+      return this;
+    }
+
+    GetDelegationTokenOp setExpiryTime(long expiryTime) {
+      this.expiryTime = expiryTime;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      token.write(out);
+      toLogLong(expiryTime).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
@@ -550,7 +1128,30 @@ public abstract class FSEditLogOp {
       super(OP_RENEW_DELEGATION_TOKEN);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static RenewDelegationTokenOp getInstance() {
+      return (RenewDelegationTokenOp)opInstances.get()
+          .get(OP_RENEW_DELEGATION_TOKEN);
+    }
+
+    RenewDelegationTokenOp setDelegationTokenIdentifier(
+        DelegationTokenIdentifier token) {
+      this.token = token;
+      return this;
+    }
+
+    RenewDelegationTokenOp setExpiryTime(long expiryTime) {
+      this.expiryTime = expiryTime;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      token.write(out);
+      toLogLong(expiryTime).write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
@@ -565,7 +1166,24 @@ public abstract class FSEditLogOp {
       super(OP_CANCEL_DELEGATION_TOKEN);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static CancelDelegationTokenOp getInstance() {
+      return (CancelDelegationTokenOp)opInstances.get()
+          .get(OP_CANCEL_DELEGATION_TOKEN);
+    }
+
+    CancelDelegationTokenOp setDelegationTokenIdentifier(
+        DelegationTokenIdentifier token) {
+      this.token = token;
+      return this;
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      token.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
@@ -579,7 +1197,23 @@ public abstract class FSEditLogOp {
       super(OP_UPDATE_MASTER_KEY);
     }
 
-    public void readFields(DataInputStream in, int logVersion)
+    static UpdateMasterKeyOp getInstance() {
+      return (UpdateMasterKeyOp)opInstances.get()
+          .get(OP_UPDATE_MASTER_KEY);
+    }
+
+    UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
+      this.key = key;
+      return this;
+    }
+    
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+      key.write(out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.key = new DelegationKey();
       this.key.readFields(in);
@@ -593,10 +1227,39 @@ public abstract class FSEditLogOp {
              code == OP_END_LOG_SEGMENT : "Bad op: " + code;
     }
 
+    static LogSegmentOp getInstance(FSEditLogOpCodes code) {
+      return (LogSegmentOp)opInstances.get().get(code);
+    }
+
     public void readFields(DataInputStream in, int logVersion)
         throws IOException {
       // no data stored in these ops yet
     }
+
+    @Override
+    void writeFields(DataOutputStream out) throws IOException {
+      // no data stored
+    }
+  }
+
+  static class InvalidOp extends FSEditLogOp {
+    private InvalidOp() {
+      super(OP_INVALID);
+    }
+
+    static InvalidOp getInstance() {
+      return (InvalidOp)opInstances.get().get(OP_INVALID);
+    }
+
+    @Override 
+    void writeFields(DataOutputStream out) throws IOException {
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      // nothing to read
+    }
   }
 
   static private short readShort(DataInputStream in) throws IOException {
@@ -607,6 +1270,14 @@ public abstract class FSEditLogOp {
     return Long.parseLong(FSImageSerialization.readString(in));
   }
 
+  static private DeprecatedUTF8 toLogReplication(short replication) {
+    return new DeprecatedUTF8(Short.toString(replication));
+  }
+  
+  static private DeprecatedUTF8 toLogLong(long timestamp) {
+    return new DeprecatedUTF8(Long.toString(timestamp));
+  }
+
   /**
    * A class to read in blocks stored in the old format. The only two
    * fields in the block were blockid and length.
@@ -710,13 +1381,43 @@ public abstract class FSEditLogOp {
   }
 
   /**
+   * Class for writing editlog ops
+   */
+  public static class Writer {
+    private final DataOutputBuffer buf;
+
+    public Writer(DataOutputBuffer out) {
+      this.buf = out;
+    }
+
+    /**
+     * Write an operation to the output stream
+     * 
+     * @param op The operation to write
+     * @throws IOException if an error occurs during writing.
+     */
+    public void writeOp(FSEditLogOp op) throws IOException {
+      int start = buf.getLength();
+      buf.writeByte(op.opCode.getOpCode());
+      buf.writeLong(op.txid);
+      op.writeFields(buf);
+      int end = buf.getLength();
+      Checksum checksum = FSEditLog.getChecksum();
+      checksum.reset();
+      checksum.update(buf.getData(), start, end-start);
+      int sum = (int)checksum.getValue();
+      buf.writeInt(sum);
+    }
+  }
+
+  /**
    * Class for reading editlog ops from a stream
    */
   public static class Reader {
     private final DataInputStream in;
     private final int logVersion;
     private final Checksum checksum;
-    private EnumMap<FSEditLogOpCodes, FSEditLogOp> opInstances;
+
     /**
      * Construct the reader
      * @param in The stream to read from.
@@ -734,36 +1435,6 @@ public abstract class FSEditLogOp {
       }
       this.logVersion = logVersion;
       this.checksum = checksum;
-      opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
-          FSEditLogOpCodes.class);
-      opInstances.put(OP_ADD, new AddCloseOp(OP_ADD));
-      opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE));
-      opInstances.put(OP_SET_REPLICATION, new SetReplicationOp());
-      opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
-      opInstances.put(OP_RENAME_OLD, new RenameOldOp());
-      opInstances.put(OP_DELETE, new DeleteOp());
-      opInstances.put(OP_MKDIR, new MkdirOp());
-      opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp());
-      opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp());
-      opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
-      opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
-      opInstances.put(OP_SET_OWNER, new SetOwnerOp());
-      opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
-      opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
-      opInstances.put(OP_SET_QUOTA, new SetQuotaOp());
-      opInstances.put(OP_TIMES, new TimesOp());
-      opInstances.put(OP_SYMLINK, new SymlinkOp());
-      opInstances.put(OP_RENAME, new RenameOp());
-      opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
-      opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
-      opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
-      opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
-                      new CancelDelegationTokenOp());
-      opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
-      opInstances.put(OP_START_LOG_SEGMENT,
-                      new LogSegmentOp(OP_START_LOG_SEGMENT));
-      opInstances.put(OP_END_LOG_SEGMENT,
-                      new LogSegmentOp(OP_END_LOG_SEGMENT));
     }
 
     /**
@@ -796,7 +1467,7 @@ public abstract class FSEditLogOp {
         return null;
       }
 
-      FSEditLogOp op = opInstances.get(opCode);
+      FSEditLogOp op = opInstances.get().get(opCode);
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 29 07:10:48 2011
@@ -295,7 +295,7 @@ public class FSImage implements Closeabl
       StorageDirectory sd = it.next();
       StorageState curState;
       try {
-        curState = sd.analyzeStorage(startOpt);
+        curState = sd.analyzeStorage(startOpt, storage);
         // sd is locked but not opened
         switch(curState) {
         case NON_EXISTENT:
@@ -311,7 +311,8 @@ public class FSImage implements Closeabl
         }
         if (curState != StorageState.NOT_FORMATTED 
             && startOpt != StartupOption.ROLLBACK) {
-          sd.read(); // read and verify consistency with other directories
+          // read and verify consistency with other directories
+          storage.readProperties(sd);
           isFormatted = true;
         }
         if (startOpt == StartupOption.IMPORT && isFormatted)
@@ -395,7 +396,7 @@ public class FSImage implements Closeabl
       try {
         // Write the version file, since saveFsImage above only makes the
         // fsimage_<txid>, and the directory is otherwise empty.
-        sd.write();
+        storage.writeProperties(sd);
         
         File prevDir = sd.getPreviousDir();
         File tmpDir = sd.getPreviousTmp();
@@ -433,14 +434,14 @@ public class FSImage implements Closeabl
       if (!prevDir.exists()) {  // use current directory then
         LOG.info("Storage directory " + sd.getRoot()
                  + " does not contain previous fs state.");
-        sd.read(); // read and verify consistency with other directories
+        // read and verify consistency with other directories
+        storage.readProperties(sd);
         continue;
       }
-      StorageDirectory sdPrev 
-        = prevState.getStorage().new StorageDirectory(sd.getRoot());
 
       // read and verify consistency of the prev dir
-      sdPrev.read(sdPrev.getPreviousVersionFile());
+      prevState.getStorage().readPreviousVersionProperties(sd);
+
       if (prevState.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
         throw new IOException(
           "Cannot rollback to storage version " +
@@ -604,7 +605,7 @@ public class FSImage implements Closeabl
     //
     StorageDirectory sdForProperties =
       loadPlan.getStorageDirectoryForProperties();
-    sdForProperties.read();
+    storage.readProperties(sdForProperties);
     File imageFile = loadPlan.getImageFile();
 
     try {
@@ -730,7 +731,6 @@ public class FSImage implements Closeabl
     storage.setMostRecentCheckpointTxId(txId);
   }
 
-
   /**
    * Save the contents of the FS image to the file.
    */