You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/26 10:25:49 UTC

svn commit: r425672 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSDirectory.java src/java/org/apache/hadoop/dfs/FSEditLog.java src/java/org/apache/hadoop/dfs/FSImage.java

Author: cutting
Date: Wed Jul 26 01:25:49 2006
New Revision: 425672

URL: http://svn.apache.org/viewvc?rev=425672&view=rev
Log:
HADOOP-335.  Refactor namenode logging.  Contributed by Konstantin.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=425672&r1=425671&r2=425672&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jul 26 01:25:49 2006
@@ -90,6 +90,9 @@
     writing zero-compressed integers (VInts and VLongs).
     (Hairong Kuang via cutting)
 
+26. HADOOP-335.  Refactor DFS namespace/transaction logging in
+    namenode.   (Konstantin Shvachko via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=425672&r1=425671&r2=425672&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Jul 26 01:25:49 2006
@@ -21,7 +21,6 @@
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.Metrics;
@@ -34,21 +33,12 @@
  * It keeps the filename->blockset mapping always-current
  * and logged to disk.
  * 
+ * TODO: Factor out to a standalone class.
+ * 
  * @author Mike Cafarella
  *************************************************/
 class FSDirectory implements FSConstants {
-    private static final String FS_IMAGE = "fsimage";
-    private static final String NEW_FS_IMAGE = "fsimage.new";
-    private static final String OLD_FS_IMAGE = "fsimage.old";
-
-    private static final byte OP_ADD = 0;
-    private static final byte OP_RENAME = 1;
-    private static final byte OP_DELETE = 2;
-    private static final byte OP_MKDIR = 3;
-    private static final byte OP_SET_REPLICATION = 4;
 
-    private int numFilesDeleted = 0;
-    
     /******************************************************
      * We keep an in-memory representation of the file/block
      * hierarchy.
@@ -92,6 +82,38 @@
         public short getReplication() {
           return this.blockReplication;
         }
+        
+        /**
+         * Get local file name
+         * @return local file name
+         */
+        String getLocalName() {
+          return name;
+        }
+
+        /**
+         * Get file blocks 
+         * @return file blocks
+         */
+        Block[] getBlocks() {
+          return this.blocks;
+        }
+        
+        /**
+         * Get parent directory 
+         * @return parent INode
+         */
+        INode getParent() {
+          return this.parent;
+        }
+
+        /**
+         * Get children 
+         * @return TreeMap of children
+         */
+        TreeMap getChildren() {
+          return this.children;
+        }
 
         /**
          * This is the external interface
@@ -182,7 +204,7 @@
                 return true;
             }
         }
-
+          
         /**
          * Collect all the blocks at this INode and all its children.
          * This operation is performed after a node is removed from the tree,
@@ -269,103 +291,43 @@
                 v.add(child);
             }
         }
-
-        /**
-         */
-        void saveImage(String parentPrefix, DataOutputStream out) throws IOException {
-            String fullName = "";
-            if (parent != null) {
-                fullName = parentPrefix + "/" + name;
-                new UTF8(fullName).write(out);
-                out.writeShort(blockReplication);
-                if (blocks == null) {
-                    out.writeInt(0);
-                } else {
-                    out.writeInt(blocks.length);
-                    for (int i = 0; i < blocks.length; i++) {
-                        blocks[i].write(out);
-                    }
-                }
-            }
-            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
-                INode child = (INode) it.next();
-                child.saveImage(fullName, out);
-            }
-        }
     }
 
     
     INode rootDir = new INode("");
     TreeMap activeBlocks = new TreeMap();
     TreeMap activeLocks = new TreeMap();
-    DataOutputStream editlog = null;
+    FSImage fsImage;  
     boolean ready = false;
-    int namespaceID = 0;  /// a persistent attribute of the namespace
-
+    int namespaceID = 0;    // TODO: move to FSImage class, it belongs there
+    // Metrics members
     private MetricsRecord metricsRecord = null;
+    private int numFilesDeleted = 0;
     
     /** Access an existing dfs name directory. */
     public FSDirectory(File dir, Configuration conf) throws IOException {
-        File fullimage = new File(dir, "image");
-        if (! fullimage.exists()) {
-          throw new IOException("NameNode not formatted: " + dir);
-        }
-        File edits = new File(dir, "edits");
-        if (loadFSImage(fullimage, edits, conf)) {
-            saveFSImage(fullimage, edits);
-        }
-
-        synchronized (this) {
-            this.ready = true;
-            this.notifyAll();
-            this.editlog = new DataOutputStream(new FileOutputStream(edits));
-            editlog.writeInt( DFS_CURRENT_VERSION );
-        }
-     
-        metricsRecord = Metrics.createRecord("dfs", "namenode");
+      this.fsImage = new FSImage( dir, conf );
+      fsImage.loadFSImage( this, conf );
+      synchronized (this) {
+        this.ready = true;
+        this.notifyAll();
+        fsImage.getEditLog().create();
+      }
+      metricsRecord = Metrics.createRecord("dfs", "namenode");
     }
 
     /** Create a new dfs name directory.  Caution: this destroys all files
-     * in this filesystem. */
-    public static void format(File dir, Configuration conf)
-      throws IOException {
-        File image = new File(dir, "image");
-        File edits = new File(dir, "edits");
-
-        if (!((!image.exists() || FileUtil.fullyDelete(image)) &&
-              (!edits.exists() || edits.delete()) &&
-              image.mkdirs())) {
-          
-          throw new IOException("Unable to format: "+dir);
-        }
+     * in this filesystem.
+     * @deprecated use @link FSImage#format(File, Configuration) instead */
+    public static void format(File dir, Configuration conf) throws IOException {
+      FSImage.format( dir, conf );
     }
     
     /**
-     * Generate new namespaceID.
-     * 
-     * namespaceID is a persistent attribute of the namespace.
-     * It is generated when the namenode is formatted and remains the same
-     * during the life cycle of the namenode.
-     * When a datanodes register they receive it as the registrationID,
-     * which is checked every time the datanode is communicating with the 
-     * namenode. Datanodes that do not 'know' the namespaceID are rejected.
-     * 
-     * @return new namespaceID
-     */
-    private int newNamespaceID() {
-      Random r = new Random();
-      r.setSeed( System.currentTimeMillis() );
-      int newID = 0;
-      while( newID == 0)
-        newID = r.nextInt();
-      return newID;
-    }
-
-    /**
      * Shutdown the filestore
      */
     public void close() throws IOException {
-        editlog.close();
+        fsImage.getEditLog().close();
     }
 
     /**
@@ -385,283 +347,6 @@
     }
 
     /**
-     * Load in the filesystem image.  It's a big list of
-     * filenames and blocks.  Return whether we should
-     * "re-save" and consolidate the edit-logs
-     */
-    boolean loadFSImage( File fsdir, 
-                         File edits, 
-                         Configuration conf
-                       ) throws IOException {
-        //
-        // Atomic move sequence, to recover from interrupted save
-        //
-        File curFile = new File(fsdir, FS_IMAGE);
-        File newFile = new File(fsdir, NEW_FS_IMAGE);
-        File oldFile = new File(fsdir, OLD_FS_IMAGE);
-
-        // Maybe we were interrupted between 2 and 4
-        if (oldFile.exists() && curFile.exists()) {
-            oldFile.delete();
-            if (edits.exists()) {
-                edits.delete();
-            }
-        } else if (oldFile.exists() && newFile.exists()) {
-            // Or maybe between 1 and 2
-            newFile.renameTo(curFile);
-            oldFile.delete();
-        } else if (curFile.exists() && newFile.exists()) {
-            // Or else before stage 1, in which case we lose the edits
-            newFile.delete();
-        }
-
-        //
-        // Load in bits
-        //
-        boolean needToSave = true;
-        int imgVersion = DFS_CURRENT_VERSION;
-        if (curFile.exists()) {
-            DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
-            try {
-                // read image version: first appeared in version -1
-                imgVersion = in.readInt();
-                // read namespaceID: first appeared in version -2
-                if( imgVersion <= -2 )
-                  namespaceID = in.readInt();
-                // read number of files
-                int numFiles = 0;
-                // version 0 does not store version #
-                // starts directly with the number of files
-                if( imgVersion >= 0 ) {  
-                  numFiles = imgVersion;
-                  imgVersion = 0;
-                } else 
-                  numFiles = in.readInt();
-                  
-                needToSave = ( imgVersion != DFS_CURRENT_VERSION );
-                if( imgVersion < DFS_CURRENT_VERSION ) // future version
-                  throw new IOException(
-                              "Unsupported version of the file system image: "
-                              + imgVersion
-                              + ". Current version = " 
-                              + DFS_CURRENT_VERSION + "." );
-                
-                // read file info
-                short replication = (short)conf.getInt("dfs.replication", 3);
-                for (int i = 0; i < numFiles; i++) {
-                    UTF8 name = new UTF8();
-                    name.readFields(in);
-                    // version 0 does not support per file replication
-                    if( !(imgVersion >= 0) ) {
-                      replication = in.readShort(); // other versions do
-                      replication = adjustReplication( replication, conf );
-                    }
-                    int numBlocks = in.readInt();
-                    Block blocks[] = null;
-                    if (numBlocks > 0) {
-                        blocks = new Block[numBlocks];
-                        for (int j = 0; j < numBlocks; j++) {
-                            blocks[j] = new Block();
-                            blocks[j].readFields(in);
-                        }
-                    }
-                    unprotectedAddFile(name, 
-                            new INode( name.toString(), blocks, replication ));
-                }
-            } finally {
-                in.close();
-            }
-        }
-
-        if( namespaceID == 0 )
-          namespaceID = newNamespaceID();
-        
-        return needToSave || ( edits.exists() && loadFSEdits(edits, conf) > 0 );
-    }
-
-    /**
-     * Load an edit log, and apply the changes to the in-memory structure
-     *
-     * This is where we apply edits that we've been writing to disk all
-     * along.
-     */
-    int loadFSEdits(File edits, Configuration conf) throws IOException {
-        int numEdits = 0;
-        int logVersion = 0;
-
-        if (edits.exists()) {
-            DataInputStream in = new DataInputStream(
-                                    new BufferedInputStream(
-                                        new FileInputStream(edits)));
-            // Read log file version. Could be missing. 
-            in.mark( 4 );
-            if( in.available() > 0 ) {
-              logVersion = in.readByte();
-              in.reset();
-              if( logVersion >= 0 )
-                logVersion = 0;
-              else
-                logVersion = in.readInt();
-              if( logVersion < DFS_CURRENT_VERSION ) // future version
-                  throw new IOException(
-                            "Unexpected version of the file system log file: "
-                            + logVersion
-                            + ". Current version = " 
-                            + DFS_CURRENT_VERSION + "." );
-            }
-            
-            short replication = (short)conf.getInt("dfs.replication", 3);
-            try {
-                while (in.available() > 0) {
-                    byte opcode = in.readByte();
-                    numEdits++;
-                    switch (opcode) {
-                    case OP_ADD: {
-                        UTF8 name = new UTF8();
-                        ArrayWritable aw = null;
-                        Writable writables[];
-                        // version 0 does not support per file replication
-                        if( logVersion >= 0 )
-                          name.readFields(in);  // read name only
-                        else {  // other versions do
-                          // get name and replication
-                          aw = new ArrayWritable(UTF8.class);
-                          aw.readFields(in);
-                          writables = aw.get(); 
-                          if( writables.length != 2 )
-                            throw new IOException("Incorrect data fortmat. " 
-                                           + "Name & replication pair expected");
-                          name = (UTF8) writables[0];
-                          replication = Short.parseShort(
-                                              ((UTF8)writables[1]).toString());
-                          replication = adjustReplication( replication, conf );
-                        }
-                        // get blocks
-                        aw = new ArrayWritable(Block.class);
-                        aw.readFields(in);
-                        writables = aw.get();
-                        Block blocks[] = new Block[writables.length];
-                        System.arraycopy(writables, 0, blocks, 0, blocks.length);
-                        // add to the file tree
-                        unprotectedAddFile(name, 
-                            new INode( name.toString(), blocks, replication ));
-                        break;
-                    }
-                    case OP_SET_REPLICATION: {
-                        UTF8 src = new UTF8();
-                        UTF8 repl = new UTF8();
-                        src.readFields(in);
-                        repl.readFields(in);
-                        replication=adjustReplication(
-                                fromLogReplication(repl),
-                                conf);
-                        unprotectedSetReplication(src.toString(), 
-                                                  replication,
-                                                  null);
-                        break;
-                    } 
-                    case OP_RENAME: {
-                        UTF8 src = new UTF8();
-                        UTF8 dst = new UTF8();
-                        src.readFields(in);
-                        dst.readFields(in);
-                        unprotectedRenameTo(src, dst);
-                        break;
-                    }
-                    case OP_DELETE: {
-                        UTF8 src = new UTF8();
-                        src.readFields(in);
-                        unprotectedDelete(src);
-                        break;
-                    }
-                    case OP_MKDIR: {
-                        UTF8 src = new UTF8();
-                        src.readFields(in);
-                        unprotectedMkdir(src.toString());
-                        break;
-                    }
-                    default: {
-                        throw new IOException("Never seen opcode " + opcode);
-                    }
-                    }
-                }
-            } finally {
-                in.close();
-            }
-        }
-        
-        if( logVersion != DFS_CURRENT_VERSION ) // other version
-          numEdits++; // save this image asap
-        return numEdits;
-    }
-
-    private static short adjustReplication( short replication, Configuration conf) {
-        short minReplication = (short)conf.getInt("dfs.replication.min", 1);
-        if( replication<minReplication ) {
-            replication = minReplication;
-        }
-        short maxReplication = (short)conf.getInt("dfs.replication.max", 512);
-        if( replication>maxReplication ) {
-            replication = maxReplication;
-        }
-        return replication;
-    }
-    /**
-     * Save the contents of the FS image
-     */
-    void saveFSImage(File fullimage, File edits) throws IOException {
-        File curFile = new File(fullimage, FS_IMAGE);
-        File newFile = new File(fullimage, NEW_FS_IMAGE);
-        File oldFile = new File(fullimage, OLD_FS_IMAGE);
-
-        //
-        // Write out data
-        //
-        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
-        try {
-            out.writeInt(DFS_CURRENT_VERSION);
-            out.writeInt(this.namespaceID);
-            out.writeInt(rootDir.numItemsInTree() - 1);
-            rootDir.saveImage("", out);
-        } finally {
-            out.close();
-        }
-
-        //
-        // Atomic move sequence
-        //
-        // 1.  Move cur to old
-        curFile.renameTo(oldFile);
-        
-        // 2.  Move new to cur
-        newFile.renameTo(curFile);
-
-        // 3.  Remove pending-edits file (it's been integrated with newFile)
-        edits.delete();
-        
-        // 4.  Delete old
-        oldFile.delete();
-    }
-
-    /**
-     * Write an operation to the edit log
-     */
-    void logEdit(byte op, Writable w1, Writable w2) {
-        synchronized (editlog) {
-            try {
-                editlog.write(op);
-                if (w1 != null) {
-                    w1.write(editlog);
-                }
-                if (w2 != null) {
-                    w2.write(editlog);
-                }
-            } catch (IOException ie) {
-            }
-        }
-    }
-    
-    /**
      * Add the given filename to the fs.
      */
     public boolean addFile(UTF8 path, Block[] blocks, short replication) {
@@ -677,26 +362,13 @@
                     +blocks.length+" blocks to the file system" );
            return false;
         }
-        // add createRecord file record to log
-        UTF8 nameReplicationPair[] = new UTF8[] { 
-                              path, 
-                              toLogReplication( replication )};
-        logEdit(OP_ADD,
-                new ArrayWritable( UTF8.class, nameReplicationPair ), 
-                new ArrayWritable( Block.class, newNode.blocks ));
+        // add create file record to log
+        fsImage.getEditLog().logCreateFile( newNode );
         NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                 +path+" with "+blocks.length+" blocks is added to the file system" );
         return true;
     }
     
-    private static UTF8 toLogReplication( short replication ) {
-      return new UTF8( Short.toString(replication));
-    }
-    
-    private static short fromLogReplication( UTF8 replication ) {
-      return Short.parseShort(replication.toString());
-    }    
-    
     /**
      */
     boolean unprotectedAddFile(UTF8 path, INode newNode) {
@@ -716,20 +388,23 @@
         }
       }
     }
+    
+    boolean unprotectedAddFile(UTF8 path, Block[] blocks, short replication ) {
+      return unprotectedAddFile( path,  
+                    new INode( path.toString(), blocks, replication ));
+    }
 
     /**
      * Change the filename
      */
     public boolean renameTo(UTF8 src, UTF8 dst) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
-                +src+" to "+dst );
-        waitForReady();
-        if (unprotectedRenameTo(src, dst)) {
-            logEdit(OP_RENAME, src, dst);
-            return true;
-        } else {
-            return false;
-        }
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
+          +src+" to "+dst );
+      waitForReady();
+      if( ! unprotectedRenameTo(src, dst) )
+        return false;
+      fsImage.getEditLog().logRename(src, dst);
+      return true;
     }
 
     /**
@@ -783,23 +458,21 @@
      * @return array of file blocks
      * @throws IOException
      */
-    public Block[] setReplication(String src, 
-                              short replication,
-                              Vector oldReplication
-                             ) throws IOException {
+    Block[] setReplication( String src, 
+                            short replication,
+                            Vector oldReplication
+                           ) throws IOException {
       waitForReady();
       Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication );
-      if( fileBlocks != null )  // 
-        logEdit(OP_SET_REPLICATION, 
-                new UTF8(src), 
-                toLogReplication( replication ));
+      if( fileBlocks != null )  // log replication change
+        fsImage.getEditLog().logSetReplication( src, replication );
       return fileBlocks;
     }
 
-    private Block[] unprotectedSetReplication( String src, 
-                                          short replication,
-                                          Vector oldReplication
-                                        ) throws IOException {
+    Block[] unprotectedSetReplication(  String src, 
+                                        short replication,
+                                        Vector oldReplication
+                                      ) throws IOException {
       if( oldReplication == null )
         oldReplication = new Vector();
       oldReplication.setSize(1);
@@ -847,7 +520,7 @@
         waitForReady();
         Block[] blocks = unprotectedDelete(src); 
         if( blocks != null )
-          logEdit(OP_DELETE, src, null);
+          fsImage.getEditLog().logDelete( src );
         return blocks;
     }
 
@@ -984,7 +657,7 @@
     }
 
     /**
-     * Create the given directory and all its parent dirs.
+     * @deprecated use @link #mkdirs(String) instead
      */
     public boolean mkdirs(UTF8 src) {
         return mkdirs(src.toString());
@@ -1019,7 +692,7 @@
                if (inserted != null) {
                    NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
                         +"created directory "+cur );
-                   logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
+                   fsImage.getEditLog().logMkDir( inserted );
                } // otherwise cur exists, continue
             } catch (FileNotFoundException e ) {
                 NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=425672&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Wed Jul 26 01:25:49 2006
@@ -0,0 +1,270 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * FSEditLog maintains a log of the namespace modifications.
+ * 
+ * @author Konstantin Shvachko
+ */
+class FSEditLog {
+  private static final byte OP_ADD = 0;
+  private static final byte OP_RENAME = 1;
+  private static final byte OP_DELETE = 2;
+  private static final byte OP_MKDIR = 3;
+  private static final byte OP_SET_REPLICATION = 4;
+  
+  private File editsFile;
+  DataOutputStream editsStream = null;
+  
+  FSEditLog( File edits ) {
+    this.editsFile = edits;
+  }
+  
+  File getEditsFile() {
+    return this.editsFile;
+  }
+
+  /**
+   * Initialize the output stream for logging.
+   * 
+   * @throws IOException
+   */
+  void create() throws IOException {
+    editsStream = new DataOutputStream(new FileOutputStream(editsFile));
+    editsStream.writeInt( FSConstants.DFS_CURRENT_VERSION );
+  }
+  
+  /**
+   * Shutdown the filestore
+   */
+  void close() throws IOException {
+    editsStream.close();
+  }
+
+  /**
+   * Load an edit log, and apply the changes to the in-memory structure
+   *
+   * This is where we apply edits that we've been writing to disk all
+   * along.
+   */
+  int loadFSEdits( FSDirectory fsDir, 
+                   Configuration conf
+                 ) throws IOException {
+    int numEdits = 0;
+    int logVersion = 0;
+    
+    if (editsFile.exists()) {
+      DataInputStream in = new DataInputStream(
+          new BufferedInputStream(
+              new FileInputStream(editsFile)));
+      // Read log file version. Could be missing. 
+      in.mark( 4 );
+      if( in.available() > 0 ) {
+        logVersion = in.readByte();
+        in.reset();
+        if( logVersion >= 0 )
+          logVersion = 0;
+        else
+          logVersion = in.readInt();
+        if( logVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
+          throw new IOException(
+              "Unexpected version of the file system log file: "
+              + logVersion
+              + ". Current version = " 
+              + FSConstants.DFS_CURRENT_VERSION + "." );
+      }
+      
+      short replication = (short)conf.getInt("dfs.replication", 3);
+      try {
+        while (in.available() > 0) {
+          byte opcode = in.readByte();
+          numEdits++;
+          switch (opcode) {
+          case OP_ADD: {
+            UTF8 name = new UTF8();
+            ArrayWritable aw = null;
+            Writable writables[];
+            // version 0 does not support per file replication
+            if( logVersion >= 0 )
+              name.readFields(in);  // read name only
+            else {  // other versions do
+              // get name and replication
+              aw = new ArrayWritable(UTF8.class);
+              aw.readFields(in);
+              writables = aw.get(); 
+              if( writables.length != 2 )
+                throw new IOException("Incorrect data fortmat. " 
+                    + "Name & replication pair expected");
+              name = (UTF8) writables[0];
+              replication = Short.parseShort(
+                  ((UTF8)writables[1]).toString());
+              replication = adjustReplication( replication, conf );
+            }
+            // get blocks
+            aw = new ArrayWritable(Block.class);
+            aw.readFields(in);
+            writables = aw.get();
+            Block blocks[] = new Block[writables.length];
+            System.arraycopy(writables, 0, blocks, 0, blocks.length);
+            // add to the file tree
+            fsDir.unprotectedAddFile(name, blocks, replication );
+            break;
+          }
+          case OP_SET_REPLICATION: {
+            UTF8 src = new UTF8();
+            UTF8 repl = new UTF8();
+            src.readFields(in);
+            repl.readFields(in);
+            replication = adjustReplication(
+                            fromLogReplication(repl),
+                            conf);
+            fsDir.unprotectedSetReplication(src.toString(), 
+                replication,
+                null);
+            break;
+          } 
+          case OP_RENAME: {
+            UTF8 src = new UTF8();
+            UTF8 dst = new UTF8();
+            src.readFields(in);
+            dst.readFields(in);
+            fsDir.unprotectedRenameTo(src, dst);
+            break;
+          }
+          case OP_DELETE: {
+            UTF8 src = new UTF8();
+            src.readFields(in);
+            fsDir.unprotectedDelete(src);
+            break;
+          }
+          case OP_MKDIR: {
+            UTF8 src = new UTF8();
+            src.readFields(in);
+            fsDir.unprotectedMkdir(src.toString());
+            break;
+          }
+          default: {
+            throw new IOException("Never seen opcode " + opcode);
+          }
+          }
+        }
+      } finally {
+        in.close();
+      }
+    }
+    
+    if( logVersion != FSConstants.DFS_CURRENT_VERSION ) // other version
+      numEdits++; // save this image asap
+    return numEdits;
+  }
+  
+  static short adjustReplication( short replication, Configuration conf) {
+    short minReplication = (short)conf.getInt("dfs.replication.min", 1);
+    if( replication<minReplication ) {
+      replication = minReplication;
+    }
+    short maxReplication = (short)conf.getInt("dfs.replication.max", 512);
+    if( replication>maxReplication ) {
+      replication = maxReplication;
+    }
+    return replication;
+  }
+
+  /**
+   * Write an operation to the edit log
+   */
+  void logEdit(byte op, Writable w1, Writable w2) {
+    synchronized (editsStream) {
+      try {
+        editsStream.write(op);
+        if (w1 != null) {
+          w1.write(editsStream);
+        }
+        if (w2 != null) {
+          w2.write(editsStream);
+        }
+      } catch (IOException ie) {
+        // TODO: Must report an error here
+      }
+    }
+    // TODO: initialize checkpointing if the log is large enough
+  }
+
+  /** 
+   * Add create file record to edit log
+   */
+  void logCreateFile( FSDirectory.INode newNode ) {
+    UTF8 nameReplicationPair[] = new UTF8[] { 
+                        new UTF8( newNode.computeName() ), 
+                        FSEditLog.toLogReplication( newNode.getReplication() )};
+    logEdit(OP_ADD,
+            new ArrayWritable( UTF8.class, nameReplicationPair ), 
+            new ArrayWritable( Block.class, newNode.getBlocks() ));
+  }
+  
+  /** 
+   * Add create directory record to edit log
+   */
+  void logMkDir( FSDirectory.INode newNode ) {
+    logEdit(OP_MKDIR, new UTF8( newNode.computeName() ), null );
+  }
+  
+  /** 
+   * Add rename record to edit log
+   * TODO: use String parameters until just before writing to disk
+   */
+  void logRename( UTF8 src, UTF8 dst ) {
+    logEdit(OP_RENAME, src, dst);
+  }
+  
+  /** 
+   * Add set replication record to edit log
+   */
+  void logSetReplication( String src, short replication ) {
+    logEdit(OP_SET_REPLICATION, 
+            new UTF8(src), 
+            FSEditLog.toLogReplication( replication ));
+  }
+  
+  /** 
+   * Add delete file record to edit log
+   */
+  void logDelete( UTF8 src ) {
+    logEdit(OP_DELETE, src, null);
+  }
+  
+  static UTF8 toLogReplication( short replication ) {
+    return new UTF8( Short.toString(replication));
+  }
+  
+  static short fromLogReplication( UTF8 replication ) {
+    return Short.parseShort(replication.toString());
+  }    
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=425672&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Wed Jul 26 01:25:49 2006
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSDirectory.INode;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * FSImage handles checkpointing and logging of the namespace edits.
+ * 
+ * @author Konstantin Shvachko
+ */
+class FSImage {
+  private static final String FS_IMAGE = "fsimage";
+  private static final String NEW_FS_IMAGE = "fsimage.new";
+  private static final String OLD_FS_IMAGE = "fsimage.old";
+
+  private File imageDir;  /// directory that contains the image file 
+  private FSEditLog editLog;
+  // private int namespaceID = 0;    /// a persistent attribute of the namespace
+
+  /**
+   * 
+   */
+  FSImage( File fsDir, Configuration conf ) throws IOException {
+    this.imageDir = new File(fsDir, "image");
+    if (! imageDir.exists()) {
+      throw new IOException("NameNode not formatted: " + fsDir);
+    }
+    File edits = new File(fsDir, "edits");
+    this.editLog = new FSEditLog( edits );
+  }
+  
+  FSEditLog getEditLog() {
+    return editLog;
+  }
+
+  /**
+   * Load in the filesystem image.  It's a big list of
+   * filenames and blocks.  Return whether we should
+   * "re-save" and consolidate the edit-logs
+   */
+  void loadFSImage( FSDirectory fsDir, 
+                    Configuration conf
+                  ) throws IOException {
+    File edits = editLog.getEditsFile();
+    //
+    // Atomic move sequence, to recover from interrupted save
+    //
+    File curFile = new File(imageDir, FS_IMAGE);
+    File newFile = new File(imageDir, NEW_FS_IMAGE);
+    File oldFile = new File(imageDir, OLD_FS_IMAGE);
+
+    // Maybe we were interrupted between 2 and 4
+    if (oldFile.exists() && curFile.exists()) {
+      oldFile.delete();
+      if (edits.exists()) {
+        edits.delete();
+      }
+    } else if (oldFile.exists() && newFile.exists()) {
+      // Or maybe between 1 and 2
+      newFile.renameTo(curFile);
+      oldFile.delete();
+    } else if (curFile.exists() && newFile.exists()) {
+      // Or else before stage 1, in which case we lose the edits
+      newFile.delete();
+    }
+
+    //
+    // Load in bits
+    //
+    boolean needToSave = true;
+    int imgVersion = FSConstants.DFS_CURRENT_VERSION;
+    if (curFile.exists()) {
+      DataInputStream in = new DataInputStream(
+                              new BufferedInputStream(
+                                  new FileInputStream(curFile)));
+      try {
+        // read image version: first appeared in version -1
+        imgVersion = in.readInt();
+        // read namespaceID: first appeared in version -2
+        if( imgVersion <= -2 )
+          fsDir.namespaceID = in.readInt();
+        // read number of files
+        int numFiles = 0;
+        // version 0 does not store version #
+        // starts directly with the number of files
+        if( imgVersion >= 0 ) {  
+          numFiles = imgVersion;
+          imgVersion = 0;
+        } else 
+          numFiles = in.readInt();
+        
+        needToSave = ( imgVersion != FSConstants.DFS_CURRENT_VERSION );
+        if( imgVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
+          throw new IOException(
+              "Unsupported version of the file system image: "
+              + imgVersion
+              + ". Current version = " 
+              + FSConstants.DFS_CURRENT_VERSION + "." );
+        
+        // read file info
+        short replication = (short)conf.getInt("dfs.replication", 3);
+        for (int i = 0; i < numFiles; i++) {
+          UTF8 name = new UTF8();
+          name.readFields(in);
+          // version 0 does not support per file replication
+          if( !(imgVersion >= 0) ) {
+            replication = in.readShort(); // other versions do
+            replication = FSEditLog.adjustReplication( replication, conf );
+          }
+          int numBlocks = in.readInt();
+          Block blocks[] = null;
+          if (numBlocks > 0) {
+            blocks = new Block[numBlocks];
+            for (int j = 0; j < numBlocks; j++) {
+              blocks[j] = new Block();
+              blocks[j].readFields(in);
+            }
+          }
+          fsDir.unprotectedAddFile(name, blocks, replication );
+        }
+      } finally {
+        in.close();
+      }
+    }
+    
+    if( fsDir.namespaceID == 0 )
+      fsDir.namespaceID = newNamespaceID();
+    
+    needToSave |= ( edits.exists() && editLog.loadFSEdits(fsDir, conf) > 0 );
+    if( needToSave )
+      saveFSImage( fsDir );
+  }
+
+  /**
+   * Save the contents of the FS image
+   */
+  void saveFSImage( FSDirectory fsDir ) throws IOException {
+    File curFile = new File(imageDir, FS_IMAGE);
+    File newFile = new File(imageDir, NEW_FS_IMAGE);
+    File oldFile = new File(imageDir, OLD_FS_IMAGE);
+    
+    //
+    // Write out data
+    //
+    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
+    try {
+      out.writeInt(FSConstants.DFS_CURRENT_VERSION);
+      out.writeInt(fsDir.namespaceID);
+      out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
+      saveImage( "", fsDir.rootDir, out );
+    } finally {
+      out.close();
+    }
+    
+    //
+    // Atomic move sequence
+    //
+    // 1.  Move cur to old
+    curFile.renameTo(oldFile);
+    // 2.  Move new to cur
+    newFile.renameTo(curFile);
+    // 3.  Remove pending-edits file (it's been integrated with newFile)
+    editLog.getEditsFile().delete();
+    // 4.  Delete old
+    oldFile.delete();
+  }
+
+  /**
+   * Generate new namespaceID.
+   * 
+   * namespaceID is a persistent attribute of the namespace.
+   * It is generated when the namenode is formatted and remains the same
+   * during the life cycle of the namenode.
+   * When a datanodes register they receive it as the registrationID,
+   * which is checked every time the datanode is communicating with the 
+   * namenode. Datanodes that do not 'know' the namespaceID are rejected.
+   * 
+   * @return new namespaceID
+   */
+  private int newNamespaceID() {
+    Random r = new Random();
+    r.setSeed( System.currentTimeMillis() );
+    int newID = 0;
+    while( newID == 0)
+      newID = r.nextInt();
+    return newID;
+  }
+  
+  /** Create a new dfs name directory.  Caution: this destroys all files
+   * in this filesystem. */
+  static void format(File dir, Configuration conf) throws IOException {
+    File image = new File(dir, "image");
+    File edits = new File(dir, "edits");
+    
+    if (!((!image.exists() || FileUtil.fullyDelete(image)) &&
+        (!edits.exists() || edits.delete()) &&
+        image.mkdirs())) {
+      throw new IOException("Unable to format: "+dir);
+    }
+  }
+
+  /**
+   * Save file tree image starting from the given root.
+   */
+  void saveImage( String parentPrefix, 
+                  FSDirectory.INode root, 
+                  DataOutputStream out ) throws IOException {
+    String fullName = "";
+    if( root.getParent() != null) {
+      fullName = parentPrefix + "/" + root.getLocalName();
+      new UTF8(fullName).write(out);
+      out.writeShort( root.getReplication() );
+      if( root.isDir() ) {
+        out.writeInt(0);
+      } else {
+        int nrBlocks = root.getBlocks().length;
+        out.writeInt( nrBlocks );
+        for (int i = 0; i < nrBlocks; i++)
+          root.getBlocks()[i].write(out);
+      }
+    }
+    for(Iterator it = root.getChildren().values().iterator(); it.hasNext(); ) {
+      INode child = (INode) it.next();
+      saveImage( fullName, child, out );
+    }
+  }
+}