You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2008/08/16 02:06:06 UTC

svn commit: r686420 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/io/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/server/namenode/

Author: shv
Date: Fri Aug 15 17:06:05 2008
New Revision: 686420

URL: http://svn.apache.org/viewvc?rev=686420&view=rev
Log:
HADOOP-3905. Create generic interfaces for edit log streams. Contributed by Konstantin Shvachko.

Added:
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 15 17:06:05 2008
@@ -197,6 +197,8 @@
 
     HADOOP-3935. Split out inner classes from DataNode.java. (johan)
 
+    HADOOP-3905. Create generic interfaces for edit log streams. (shv)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java Fri Aug 15 17:06:05 2008
@@ -44,7 +44,6 @@
   private static class Buffer extends ByteArrayOutputStream {
     public byte[] getData() { return buf; }
     public int getLength() { return count; }
-    public void reset() { count = 0; }
 
     public Buffer() {
       super();
@@ -101,4 +100,9 @@
   public void write(DataInput in, int length) throws IOException {
     buffer.write(in, length);
   }
+
+  /** Write to a file stream */
+  public void writeTo(OutputStream out) throws IOException {
+    buffer.writeTo(out);
+  }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Aug 15 17:06:05 2008
@@ -484,7 +484,7 @@
    * 2) to receive a registrationID 
    * issued by the namenode to recognize registered datanodes.
    * 
-   * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
+   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
    * @throws IOException
    */
   private void register() throws IOException {

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=686420&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Aug 15 17:06:05 2008
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic abstract class to support journaling of edits logs into 
+ * a persistent storage.
+ */
+abstract class EditLogOutputStream extends OutputStream {
+  // these are statistics counters
+  private long numSync;        // number of sync(s) to disk
+  private long totalTimeSync;  // total time to sync
+
+  EditLogOutputStream() throws IOException {
+    numSync = totalTimeSync = 0;
+  }
+
+  /**
+   * Get this stream name.
+   * 
+   * @return name of the stream
+   */
+  abstract String getName();
+
+  /** {@inheritDoc} */
+  abstract public void write(int b) throws IOException;
+
+  /**
+   * Write edits log record into the stream.
+   * The record is represented by operation name and
+   * an array of Writable arguments.
+   * 
+   * @param op operation
+   * @param writables array of Writable arguments
+   * @throws IOException
+   */
+  abstract void write(byte op, Writable ... writables) throws IOException;
+
+  /**
+   * Create and initialize new edits log storage.
+   * 
+   * @throws IOException
+   */
+  abstract void create() throws IOException;
+
+  /** {@inheritDoc} */
+  abstract public void close() throws IOException;
+
+  /**
+   * All data that has been written to the stream so far will be flushed.
+   * New data can be still written to the stream while flushing is performed.
+   */
+  abstract void setReadyToFlush() throws IOException;
+
+  /**
+   * Flush and sync all data that is ready to be flush 
+   * {@link #setReadyToFlush()} into underlying persistent store.
+   * @throws IOException
+   */
+  abstract protected void flushAndSync() throws IOException;
+
+  /**
+   * Flush data to persistent store.
+   * Collect sync metrics.
+   */
+  public void flush() throws IOException {
+    numSync++;
+    long start = FSNamesystem.now();
+    flushAndSync();
+    long end = FSNamesystem.now();
+    totalTimeSync += (end - start);
+  }
+
+  /**
+   * Return the size of the current edits log.
+   */
+  abstract long length() throws IOException;
+
+  /**
+   * Returns the time the edits log stream was last modified. 
+   */
+  abstract long lastModified();
+
+  /**
+   * Return total time spent in {@link #flushAndSync()}
+   */
+  long getTotalSyncTime() {
+    return totalTimeSync;
+  }
+
+  /**
+   * Return number of calls to {@link #flushAndSync()}
+   */
+  long getNumSync() {
+    return numSync;
+  }
+}

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java?rev=686420&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java Fri Aug 15 17:06:05 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A generic abstract class to support reading edits log data from 
+ * persistent storage.
+ * 
+ * It should stream bytes from the storage exactly as they were written
+ * into the #{@link EditLogOutputStream}.
+ */
+abstract class EditLogInputStream extends InputStream {
+  /**
+   * Get this stream name.
+   * 
+   * @return name of the stream
+   */
+  abstract String getName();
+
+  /** {@inheritDoc} */
+  public abstract int available() throws IOException;
+
+  /** {@inheritDoc} */
+  public abstract int read() throws IOException;
+
+  /** {@inheritDoc} */
+  public abstract void close() throws IOException;
+
+  /**
+   * Return the size of the current edits log.
+   */
+  abstract long length() throws IOException;
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Aug 15 17:06:05 2008
@@ -21,8 +21,6 @@
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -97,123 +95,148 @@
     }
   };
 
-  static class EditLogOutputStream {
-    private FileChannel fc;
-    private FileOutputStream fp;
-    private DataOutputStream od;
-    private DataOutputStream od1;
-    private DataOutputStream od2;
-    private ByteArrayOutputStream buf1;
-    private ByteArrayOutputStream buf2;
-    private int bufSize;
-
-    // these are statistics counters
-    private long numSync;        // number of syncs to disk
-    private long totalTimeSync;  // total time to sync
-
-    EditLogOutputStream(File name) throws IOException {
-      bufSize = sizeFlushBuffer;
-      buf1 = new ByteArrayOutputStream(bufSize);
-      buf2 = new ByteArrayOutputStream(bufSize);
-      od1 = new DataOutputStream(buf1);
-      od2 = new DataOutputStream(buf2);
-      od = od1;                              // start with first buffer
+  /**
+   * An implementation of the abstract class {@link EditLogOutputStream},
+   * which stores edits in a local file.
+   */
+  static private class EditLogFileOutputStream extends EditLogOutputStream {
+    private File file;
+    private FileOutputStream fp;    // file stream for storing edit logs 
+    private FileChannel fc;         // channel of the file stream for sync
+    private DataOutputBuffer bufCurrent;  // current buffer for writing
+    private DataOutputBuffer bufReady;    // buffer ready for flushing
+
+    EditLogFileOutputStream(File name) throws IOException {
+      super();
+      file = name;
+      bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
+      bufReady = new DataOutputBuffer(sizeFlushBuffer);
       fp = new FileOutputStream(name, true); // open for append
       fc = fp.getChannel();
-      numSync = totalTimeSync = 0;
     }
 
-    // returns the current output stream
-    DataOutputStream getOutputStream() {
-      return od;
-    }
-
-    void flushAndSync() throws IOException {
-      this.flush();
-      fc.force(true);
+    @Override
+    String getName() {
+      return file.getPath();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(int b) throws IOException {
+      bufCurrent.write(b);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    void write(byte op, Writable ... writables) throws IOException {
+      write(op);
+      for(Writable w : writables) {
+        w.write(bufCurrent);
+      }
     }
 
+    /**
+     * Create empty edits logs file.
+     */
+    @Override
     void create() throws IOException {
       fc.truncate(0);
-      od.writeInt(FSConstants.LAYOUT_VERSION);
-      flushAndSync();
+      bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
+      setReadyToFlush();
+      flush();
     }
 
-    // flush current buffer
-    private void flush() throws IOException {
-      ByteArrayOutputStream buf = getBuffer();
-      if (buf.size() == 0) {
-        return;                // no data to flush
-      }
-      buf.writeTo(fp);         // write data to file
-      buf.reset();             // erase all data in buf
-    }
-
-    void close() throws IOException {
+    @Override
+    public void close() throws IOException {
       // close should have been called after all pending transactions 
       // have been flushed & synced.
-      if (getBufSize() != 0) {
-        throw new IOException("FSEditStream has " + getBufSize() +
+      int bufSize = bufCurrent.size();
+      if (bufSize != 0) {
+        throw new IOException("FSEditStream has " + bufSize +
                               " bytes still to be flushed and cannot " +
-                              "closed.");
+                              "be closed.");
       } 
-      od.close();
+      bufCurrent.close();
+      bufReady.close();
       fp.close();
-      buf1 = buf2 = null;
-      od = od1 = od2 = null;
+      bufCurrent = bufReady = null;
     }
 
-    // returns the amount of data in the buffer
-    int getBufSize() {
-      return getBuffer().size();
+    /**
+     * All data that has been written to the stream so far will be flushed.
+     * New data can be still written to the stream while flushing is performed.
+     */
+    @Override
+    void setReadyToFlush() {
+      assert bufReady.size() == 0 : "previous data is not flushed yet";
+      DataOutputBuffer tmp = bufReady;
+      bufReady = bufCurrent;
+      bufCurrent = tmp;
     }
 
-    // get the current buffer
-    private ByteArrayOutputStream getBuffer() {
-      if (od == od1) {
-        return buf1;
-      } else {
-        return buf2;
-      }
+    /**
+     * Flush ready buffer to persistent store.
+     * currentBuffer is not flushed as it accumulates new log records
+     * while readyBuffer will be flushed and synced.
+     */
+    @Override
+    protected void flushAndSync() throws IOException {
+      bufReady.writeTo(fp);     // write data to file
+      bufReady.reset();         // erase all data in the buffer
+      fc.force(true);           // sync to persistent store
     }
 
-    //
-    // Flush current buffer to output stream, swap buffers
-    // This is protected by the flushLock.
-    //
-    void swap() {
-      if (od == od1) {
-        od = od2;
-      } else {
-        od = od1;
-      }
+    /**
+     * Return the size of the current edit log including buffered data.
+     */
+    @Override
+    long length() throws IOException {
+      // file size + size of both buffers
+      return fc.size() + bufReady.size() + bufCurrent.size();
+    }
+    
+    /**
+     * Returns the time the edits log file was last modified. 
+     */
+    @Override
+    long lastModified() {
+      return file.lastModified();
     }
+  }
 
-    //
-    // Flush old buffer to persistent store
-    //
-    void flushAndSyncOld() throws IOException {
-      numSync++;
-      ByteArrayOutputStream oldbuf;
-      if (od == od1) {
-        oldbuf = buf2;
-      } else {
-        oldbuf = buf1;
-      }
-      long start = FSNamesystem.now();
-      oldbuf.writeTo(fp);         // write data to file
-      oldbuf.reset();             // erase all data in buf
-      fc.force(true);             // sync to persistent store
-      long end = FSNamesystem.now();
-      totalTimeSync += (end - start);
+  static class EditLogFileInputStream extends EditLogInputStream {
+    private File file;
+    private FileInputStream fStream;
+
+    EditLogFileInputStream(File name) throws IOException {
+      file = name;
+      fStream = new FileInputStream(name);
+    }
+
+    @Override
+    String getName() {
+      return file.getPath();
     }
 
-    long getTotalSyncTime() {
-      return totalTimeSync;
+    @Override
+    public int available() throws IOException {
+      return fStream.available();
     }
 
-    long getNumSync() {
-      return numSync;
+    @Override
+    public int read() throws IOException {
+      return fStream.read();
+    }
+
+    @Override
+    public void close() throws IOException {
+      fStream.close();
+    }
+
+    @Override
+    long length() throws IOException {
+      // file size + size of both buffers
+      return file.length();
     }
   }
 
@@ -258,24 +281,24 @@
     for (int idx = 0; idx < size; idx++) {
       File eFile = getEditFile(idx);
       try {
-        EditLogOutputStream eStream = new EditLogOutputStream(eFile);
+        EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
         editStreams.add(eStream);
       } catch (IOException e) {
         FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
         fsimage.processIOError(idx);
-        idx--; 
+        idx--;
       }
     }
   }
 
   public synchronized void createEditLogFile(File name) throws IOException {
-    EditLogOutputStream eStream = new EditLogOutputStream(name);
+    EditLogOutputStream eStream = new EditLogFileOutputStream(name);
     eStream.create();
     eStream.close();
   }
 
   /**
-   * Create edits.new if non existant.
+   * Create edits.new if non existent.
    */
   synchronized void createNewIfMissing() throws IOException {
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
@@ -286,7 +309,7 @@
   }
   
   /**
-   * Shutdown the filestore
+   * Shutdown the file store.
    */
   public synchronized void close() throws IOException {
     while (isSyncRunning) {
@@ -304,7 +327,8 @@
     for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
-        eStream.flushAndSync();
+        eStream.setReadyToFlush();
+        eStream.flush();
         eStream.close();
       } catch (IOException e) {
         processIOError(idx);
@@ -384,7 +408,7 @@
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  int loadFSEdits(File edits) throws IOException {
+  static int loadFSEdits(EditLogInputStream edits) throws IOException {
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
@@ -399,275 +423,272 @@
         numOpOther = 0;
     long startTime = FSNamesystem.now();
 
-    if (edits != null) {
-      DataInputStream in = new DataInputStream(new BufferedInputStream(
-                                new FileInputStream(edits)));
+    DataInputStream in = new DataInputStream(new BufferedInputStream(edits));
+    try {
+      // Read log file version. Could be missing. 
+      in.mark(4);
+      // If edits log is greater than 2G, available method will return negative
+      // numbers, so we avoid having to call available
+      boolean available = true;
       try {
-        // Read log file version. Could be missing. 
-        in.mark(4);
-        // If edits log is greater than 2G, available method will return negative
-        // numbers, so we avoid having to call available
-        boolean available = true;
+        logVersion = in.readByte();
+      } catch (EOFException e) {
+        available = false;
+      }
+      if (available) {
+        in.reset();
+        logVersion = in.readInt();
+        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+          throw new IOException(
+                          "Unexpected version of the file system log file: "
+                          + logVersion + ". Current version = " 
+                          + FSConstants.LAYOUT_VERSION + ".");
+      }
+      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+                            "Unsupported version " + logVersion;
+
+      while (true) {
+        long timestamp = 0;
+        long mtime = 0;
+        long blockSize = 0;
+        byte opcode = -1;
         try {
-          logVersion = in.readByte();
+          opcode = in.readByte();
         } catch (EOFException e) {
-          available = false;
+          break; // no more transactions
         }
-        if (available) {
-          in.reset();
-          logVersion = in.readInt();
-          if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-            throw new IOException(
-                            "Unexpected version of the file system log file: "
-                            + logVersion + ". Current version = " 
-                            + FSConstants.LAYOUT_VERSION + ".");
-        }
-        assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-                              "Unsupported version " + logVersion;
-  
-        while (true) {
-          long timestamp = 0;
-          long mtime = 0;
-          long blockSize = 0;
-          byte opcode = -1;
-          try {
-            opcode = in.readByte();
-          } catch (EOFException e) {
-            break; // no more transactions
+        numEdits++;
+        switch (opcode) {
+        case OP_ADD:
+        case OP_CLOSE: {
+          // versions > 0 support per file replication
+          // get name and replication
+          int length = in.readInt();
+          if (-7 == logVersion && length != 3||
+              logVersion < -7 && length != 4) {
+              throw new IOException("Incorrect data format."  +
+                                    " logVersion is " + logVersion +
+                                    " but writables.length is " +
+                                    length + ". ");
           }
-          numEdits++;
-          switch (opcode) {
-          case OP_ADD:
-          case OP_CLOSE: {
-            // versions > 0 support per file replication
-            // get name and replication
-            int length = in.readInt();
-            if (-7 == logVersion && length != 3||
-                logVersion < -7 && length != 4) {
-                throw new IOException("Incorrect data format."  +
-                                      " logVersion is " + logVersion +
-                                      " but writables.length is " +
-                                      length + ". ");
-            }
-            path = FSImage.readString(in);
-            short replication = adjustReplication(readShort(in));
-            mtime = readLong(in);
-            if (logVersion < -7) {
-              blockSize = readLong(in);
-            }
-            // get blocks
-            Block blocks[] = null;
-            if (logVersion <= -14) {
-              blocks = readBlocks(in);
-            } else {
-              BlockTwo oldblk = new BlockTwo();
-              int num = in.readInt();
-              blocks = new Block[num];
-              for (int i = 0; i < num; i++) {
-                oldblk.readFields(in);
-                blocks[i] = new Block(oldblk.blkid, oldblk.len, 
-                                      Block.GRANDFATHER_GENERATION_STAMP);
-              }
-            }
-
-            // Older versions of HDFS does not store the block size in inode.
-            // If the file has more than one block, use the size of the
-            // first block as the blocksize. Otherwise use the default
-            // block size.
-            if (-8 <= logVersion && blockSize == 0) {
-              if (blocks.length > 1) {
-                blockSize = blocks[0].getNumBytes();
-              } else {
-                long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
-                blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
-              }
-            }
-             
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            if (logVersion <= -11) {
-              permissions = PermissionStatus.read(in);
+          path = FSImage.readString(in);
+          short replication = adjustReplication(readShort(in));
+          mtime = readLong(in);
+          if (logVersion < -7) {
+            blockSize = readLong(in);
+          }
+          // get blocks
+          Block blocks[] = null;
+          if (logVersion <= -14) {
+            blocks = readBlocks(in);
+          } else {
+            BlockTwo oldblk = new BlockTwo();
+            int num = in.readInt();
+            blocks = new Block[num];
+            for (int i = 0; i < num; i++) {
+              oldblk.readFields(in);
+              blocks[i] = new Block(oldblk.blkid, oldblk.len, 
+                                    Block.GRANDFATHER_GENERATION_STAMP);
             }
+          }
 
-            // clientname, clientMachine and block locations of last block.
-            if (opcode == OP_ADD && logVersion <= -12) {
-              clientName = FSImage.readString(in);
-              clientMachine = FSImage.readString(in);
-              if (-13 <= logVersion) {
-                readDatanodeDescriptorArray(in);
-              }
+          // Older versions of HDFS does not store the block size in inode.
+          // If the file has more than one block, use the size of the
+          // first block as the blocksize. Otherwise use the default
+          // block size.
+          if (-8 <= logVersion && blockSize == 0) {
+            if (blocks.length > 1) {
+              blockSize = blocks[0].getNumBytes();
             } else {
-              clientName = "";
-              clientMachine = "";
-            }
-  
-            // The open lease transaction re-creates a file if necessary.
-            // Delete the file if it already exists.
-            if (FSNamesystem.LOG.isDebugEnabled()) {
-              FSNamesystem.LOG.debug(opcode + ": " + path + 
-                                     " numblocks : " + blocks.length +
-                                     " clientHolder " +  clientName +
-                                     " clientMachine " + clientMachine);
+              long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
+              blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
             }
+          }
+           
+          PermissionStatus permissions = fsNamesys.getUpgradePermission();
+          if (logVersion <= -11) {
+            permissions = PermissionStatus.read(in);
+          }
 
-            old = fsDir.unprotectedDelete(path, mtime);
-
-            // add to the file tree
-            INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                                                      path, permissions,
-                                                      blocks, replication, 
-                                                      mtime, blockSize);
-            if (opcode == OP_ADD) {
-              numOpAdd++;
-              //
-              // Replace current node with a INodeUnderConstruction.
-              // Recreate in-memory lease record.
-              //
-              INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                        node.getLocalNameBytes(),
-                                        node.getReplication(), 
-                                        node.getModificationTime(),
-                                        node.getPreferredBlockSize(),
-                                        node.getBlocks(),
-                                        node.getPermissionStatus(),
-                                        clientName, 
-                                        clientMachine, 
-                                        null);
-              fsDir.replaceNode(path, node, cons);
-              fsNamesys.leaseManager.addLease(cons.clientName, path);
-            } else if (opcode == OP_CLOSE) {
-              //
-              // Remove lease if it exists.
-              //
-              if (old.isUnderConstruction()) {
-                INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
-                                                     old;
-                fsNamesys.leaseManager.removeLease(cons.clientName, path);
-              }
+          // clientname, clientMachine and block locations of last block.
+          if (opcode == OP_ADD && logVersion <= -12) {
+            clientName = FSImage.readString(in);
+            clientMachine = FSImage.readString(in);
+            if (-13 <= logVersion) {
+              readDatanodeDescriptorArray(in);
             }
-            break;
-          } 
-          case OP_SET_REPLICATION: {
-            numOpSetRepl++;
-            path = FSImage.readString(in);
-            short replication = adjustReplication(readShort(in));
-            fsDir.unprotectedSetReplication(path, replication, null);
-            break;
-          } 
-          case OP_RENAME: {
-            numOpRename++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String s = FSImage.readString(in);
-            String d = FSImage.readString(in);
-            timestamp = readLong(in);
-            DFSFileInfo dinfo = fsDir.getFileInfo(d);
-            fsDir.unprotectedRenameTo(s, d, timestamp);
-            fsNamesys.changeLease(s, d, dinfo);
-            break;
+          } else {
+            clientName = "";
+            clientMachine = "";
           }
-          case OP_DELETE: {
-            numOpDelete++;
-            int length = in.readInt();
-            if (length != 2) {
-              throw new IOException("Incorrect data format. " 
-                                    + "delete operation.");
-            }
-            path = FSImage.readString(in);
-            timestamp = readLong(in);
-            old = fsDir.unprotectedDelete(path, timestamp);
-            if (old != null && old.isUnderConstruction()) {
-              INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
-              fsNamesys.leaseManager.removeLease(cons.clientName, path);
-            }
-            break;
+
+          // The open lease transaction re-creates a file if necessary.
+          // Delete the file if it already exists.
+          if (FSNamesystem.LOG.isDebugEnabled()) {
+            FSNamesystem.LOG.debug(opcode + ": " + path + 
+                                   " numblocks : " + blocks.length +
+                                   " clientHolder " +  clientName +
+                                   " clientMachine " + clientMachine);
           }
-          case OP_MKDIR: {
-            numOpMkDir++;
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            int length = in.readInt();
-            if (length != 2) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            path = FSImage.readString(in);
-            timestamp = readLong(in);
 
-            if (logVersion <= -11) {
-              permissions = PermissionStatus.read(in);
+          old = fsDir.unprotectedDelete(path, mtime);
+
+          // add to the file tree
+          INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+                                                    path, permissions,
+                                                    blocks, replication, 
+                                                    mtime, blockSize);
+          if (opcode == OP_ADD) {
+            numOpAdd++;
+            //
+            // Replace current node with a INodeUnderConstruction.
+            // Recreate in-memory lease record.
+            //
+            INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                      node.getLocalNameBytes(),
+                                      node.getReplication(), 
+                                      node.getModificationTime(),
+                                      node.getPreferredBlockSize(),
+                                      node.getBlocks(),
+                                      node.getPermissionStatus(),
+                                      clientName, 
+                                      clientMachine, 
+                                      null);
+            fsDir.replaceNode(path, node, cons);
+            fsNamesys.leaseManager.addLease(cons.clientName, path);
+          } else if (opcode == OP_CLOSE) {
+            //
+            // Remove lease if it exists.
+            //
+            if (old.isUnderConstruction()) {
+              INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
+                                                   old;
+              fsNamesys.leaseManager.removeLease(cons.clientName, path);
             }
-            fsDir.unprotectedMkdir(path, permissions, timestamp);
-            break;
-          }
-          case OP_SET_GENSTAMP: {
-            numOpSetGenStamp++;
-            long lw = in.readLong();
-            fsDir.namesystem.setGenerationStamp(lw);
-            break;
-          } 
-          case OP_DATANODE_ADD: {
-            numOpOther++;
-            FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
-            nodeimage.readFields(in);
-            //Datnodes are not persistent any more.
-            break;
           }
-          case OP_DATANODE_REMOVE: {
-            numOpOther++;
-            DatanodeID nodeID = new DatanodeID();
-            nodeID.readFields(in);
-            //Datanodes are not persistent any more.
-            break;
+          break;
+        } 
+        case OP_SET_REPLICATION: {
+          numOpSetRepl++;
+          path = FSImage.readString(in);
+          short replication = adjustReplication(readShort(in));
+          fsDir.unprotectedSetReplication(path, replication, null);
+          break;
+        } 
+        case OP_RENAME: {
+          numOpRename++;
+          int length = in.readInt();
+          if (length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
           }
-          case OP_SET_PERMISSIONS: {
-            numOpSetPerm++;
-            if (logVersion > -11)
-              throw new IOException("Unexpected opcode " + opcode
-                                    + " for version " + logVersion);
-            fsDir.unprotectedSetPermission(
-                FSImage.readString(in), FsPermission.read(in));
-            break;
+          String s = FSImage.readString(in);
+          String d = FSImage.readString(in);
+          timestamp = readLong(in);
+          DFSFileInfo dinfo = fsDir.getFileInfo(d);
+          fsDir.unprotectedRenameTo(s, d, timestamp);
+          fsNamesys.changeLease(s, d, dinfo);
+          break;
+        }
+        case OP_DELETE: {
+          numOpDelete++;
+          int length = in.readInt();
+          if (length != 2) {
+            throw new IOException("Incorrect data format. " 
+                                  + "delete operation.");
           }
-          case OP_SET_OWNER: {
-            numOpSetOwner++;
-            if (logVersion > -11)
-              throw new IOException("Unexpected opcode " + opcode
-                                    + " for version " + logVersion);
-            fsDir.unprotectedSetOwner(FSImage.readString(in),
-                FSImage.readString(in), FSImage.readString(in));
-            break;
+          path = FSImage.readString(in);
+          timestamp = readLong(in);
+          old = fsDir.unprotectedDelete(path, timestamp);
+          if (old != null && old.isUnderConstruction()) {
+            INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
+            fsNamesys.leaseManager.removeLease(cons.clientName, path);
           }
-          case OP_SET_QUOTA: {
-            if (logVersion > -16) {
-              throw new IOException("Unexpected opcode " + opcode
-                  + " for version " + logVersion);
-            }
-            fsDir.unprotectedSetQuota(FSImage.readString(in), 
-                readLongWritable(in) );
-            break;
+          break;
+        }
+        case OP_MKDIR: {
+          numOpMkDir++;
+          PermissionStatus permissions = fsNamesys.getUpgradePermission();
+          int length = in.readInt();
+          if (length != 2) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
           }
-          case OP_CLEAR_QUOTA: {
-            if (logVersion > -16) {
-              throw new IOException("Unexpected opcode " + opcode
-                  + " for version " + logVersion);
-            }
-            fsDir.unprotectedClearQuota(FSImage.readString(in));
-            break;
+          path = FSImage.readString(in);
+          timestamp = readLong(in);
+
+          if (logVersion <= -11) {
+            permissions = PermissionStatus.read(in);
           }
-          default: {
-            throw new IOException("Never seen opcode " + opcode);
+          fsDir.unprotectedMkdir(path, permissions, timestamp);
+          break;
+        }
+        case OP_SET_GENSTAMP: {
+          numOpSetGenStamp++;
+          long lw = in.readLong();
+          fsDir.namesystem.setGenerationStamp(lw);
+          break;
+        } 
+        case OP_DATANODE_ADD: {
+          numOpOther++;
+          FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
+          nodeimage.readFields(in);
+          //Datnodes are not persistent any more.
+          break;
+        }
+        case OP_DATANODE_REMOVE: {
+          numOpOther++;
+          DatanodeID nodeID = new DatanodeID();
+          nodeID.readFields(in);
+          //Datanodes are not persistent any more.
+          break;
+        }
+        case OP_SET_PERMISSIONS: {
+          numOpSetPerm++;
+          if (logVersion > -11)
+            throw new IOException("Unexpected opcode " + opcode
+                                  + " for version " + logVersion);
+          fsDir.unprotectedSetPermission(
+              FSImage.readString(in), FsPermission.read(in));
+          break;
+        }
+        case OP_SET_OWNER: {
+          numOpSetOwner++;
+          if (logVersion > -11)
+            throw new IOException("Unexpected opcode " + opcode
+                                  + " for version " + logVersion);
+          fsDir.unprotectedSetOwner(FSImage.readString(in),
+              FSImage.readString(in), FSImage.readString(in));
+          break;
+        }
+        case OP_SET_QUOTA: {
+          if (logVersion > -16) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
           }
+          fsDir.unprotectedSetQuota(FSImage.readString(in), 
+              readLongWritable(in) );
+          break;
+        }
+        case OP_CLEAR_QUOTA: {
+          if (logVersion > -16) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
           }
+          fsDir.unprotectedClearQuota(FSImage.readString(in));
+          break;
+        }
+        default: {
+          throw new IOException("Never seen opcode " + opcode);
+        }
         }
-      } finally {
-        in.close();
       }
-      FSImage.LOG.info("Edits file " + edits.getName() 
-          + " of size " + edits.length() + " edits # " + numEdits 
-          + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
+    } finally {
+      in.close();
     }
+    FSImage.LOG.info("Edits file " + edits.getName() 
+        + " of size " + edits.length() + " edits # " + numEdits 
+        + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
 
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
@@ -718,11 +739,7 @@
     for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
-        DataOutputStream od = eStream.getOutputStream();
-        od.write(op);
-        for(Writable w : writables) {
-          w.write(od);
-        }
+        eStream.write(op, writables);
       } catch (IOException ie) {
         processIOError(idx);         
       }
@@ -747,7 +764,7 @@
   //
   // Sync all modifications done by this thread.
   //
-  public void logSync() {
+  public void logSync() throws IOException {
     ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
 
@@ -781,7 +798,7 @@
       // swap buffers
       for (int idx = 0; idx < editStreams.size(); idx++) {
         EditLogOutputStream eStream = editStreams.get(idx);
-        eStream.swap();
+        eStream.setReadyToFlush();
       }
     }
 
@@ -790,7 +807,7 @@
     for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
-        eStream.flushAndSyncOld();
+        eStream.flush();
       } catch (IOException ie) {
         //
         // remember the streams that encountered an error.
@@ -972,14 +989,13 @@
     assert(getNumStorageDirs() == editStreams.size());
     long size = 0;
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
-      assert(size == 0 || 
-             size == getEditFile(idx).length() + eStream.getBufSize());
-      size = getEditFile(idx).length() + eStream.getBufSize();
+      long curSize = editStreams.get(idx).length();
+      assert (size == 0 || size == curSize) : "All streams must be the same";
+      size = curSize;
     }
     return size;
   }
- 
+
   /**
    * Closes the current edit log and opens edits.new. 
    * Returns the lastModified time of the edits log.
@@ -1006,7 +1022,7 @@
     //
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
       try {
-        EditLogOutputStream eStream = new EditLogOutputStream(getEditNewFile(idx));
+        EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(idx));
         eStream.create();
         editStreams.add(eStream);
       } catch (IOException e) {
@@ -1063,7 +1079,7 @@
    * Returns the timestamp of the edit log
    */
   synchronized long getFsEditTime() {
-    return getEditFile(0).lastModified();
+    return editStreams.get(0).lastModified();
   }
 
   // sets the initial capacity of the flush buffer.

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Aug 15 17:06:05 2008
@@ -53,6 +53,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -857,10 +858,16 @@
    */
   int loadFSEdits(StorageDirectory sd) throws IOException {
     int numEdits = 0;
-    numEdits = editLog.loadFSEdits(getImageFile(sd, NameNodeFile.EDITS));
+    EditLogFileInputStream edits = 
+      new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
+    numEdits = FSEditLog.loadFSEdits(edits);
+    edits.close();
     File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
-    if (editsNew.exists()) 
-      numEdits += editLog.loadFSEdits(editsNew);
+    if (editsNew.exists() && editsNew.length() > 0) {
+      edits = new EditLogFileInputStream(editsNew);
+      numEdits += FSEditLog.loadFSEdits(edits);
+      edits.close();
+    }
     return numEdits;
   }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Aug 15 17:06:05 2008
@@ -30,7 +30,7 @@
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
 
 /**
  * This class tests the creation and validation of a checkpoint.
@@ -139,7 +139,7 @@
     for (int i = 0; i < numdirs; i++) {
       File editFile = fsimage.getEditFile(i);
       System.out.println("Verifying file: " + editFile);
-      int numEdits = editLog.loadFSEdits(editFile);
+      int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile));
       int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease();
       System.out.println("Number of outstanding leases " + numLeases);
       assertEquals(0, numLeases);