You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/03 23:39:27 UTC

svn commit: r525290 [1/3] - in /lucene/hadoop/trunk: ./ bin/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/

Author: cutting
Date: Tue Apr  3 14:39:25 2007
New Revision: 525290

URL: http://svn.apache.org/viewvc?view=rev&rev=525290
Log:
HADOOP-702.  Add tools to help automate HDFS upgrades.  Contributed by Konstantin.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/bin/start-dfs.sh
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Apr  3 14:39:25 2007
@@ -94,6 +94,9 @@
 29. HADOOP-1156.  Fix a NullPointerException in MiniDFSCluster.
     (Hairong Kuang via cutting)
 
+30. HADOOP-702.  Add tools to help automate HDFS upgrades.
+    (Konstantin Shvachko via cutting)
+
 
 Release 0.12.3 (not yet released)
 

Modified: lucene/hadoop/trunk/bin/start-dfs.sh
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/start-dfs.sh?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/bin/start-dfs.sh (original)
+++ lucene/hadoop/trunk/bin/start-dfs.sh Tue Apr  3 14:39:25 2007
@@ -1,15 +1,37 @@
 #!/bin/sh
 
-# Start hadoop dfs daemons.  Run this on master node.
+# Start hadoop dfs daemons.
+# Optinally upgrade or rollback dfs state.
+# Run this on master node.
+##
+
+usage="Usage: start-dfs.sh [-upgrade|-rollback]"
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/hadoop-config.sh
 
+# get arguments
+if [ $# -ge 1 ]; then
+	nameStartOpt=$1
+	shift
+	case $nameStartOpt in
+	  (-upgrade)
+	  	;;
+	  (-rollback) 
+	  	dataStartOpt=$nameStartOpt
+	  	;;
+	  (*)
+		  echo $usage
+		  exit 1
+	    ;;
+	esac
+fi
+
 # start dfs daemons
 # start namenode after datanodes, to minimize time namenode is up w/o data
 # note: datanodes will log connection errors until namenode starts
-"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start namenode
-"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode
+"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start namenode $nameStartOpt
+"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt
 "$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Tue Apr  3 14:39:25 2007
@@ -17,9 +17,40 @@
  */
 package org.apache.hadoop.dfs;
 
+import java.io.*;
 import org.apache.hadoop.io.*;
 
-import java.io.*;
+class DatanodeCommand implements Writable {
+  DatanodeProtocol.DataNodeAction action;
+  
+  public DatanodeCommand() {
+    this( DatanodeProtocol.DataNodeAction.DNA_UNKNOWN );
+  }
+  
+  public DatanodeCommand( DatanodeProtocol.DataNodeAction action ) {
+    this.action = action;
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (BlockCommand.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new DatanodeCommand(); }
+       });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeEnum( out, action );
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    this.action = (DatanodeProtocol.DataNodeAction)
+          WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
+  }
+}
 
 /****************************************************
  * A BlockCommand is an instruction to a datanode 
@@ -30,25 +61,11 @@
  * 
  * @author Mike Cafarella
  ****************************************************/
-class BlockCommand implements Writable {
-
-    static {                                      // register a ctor
-      WritableFactories.setFactory
-        (BlockCommand.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new BlockCommand(); }
-         });
-    }
-
-    DatanodeProtocol.DataNodeAction action;
+class BlockCommand extends DatanodeCommand {
     Block blocks[];
     DatanodeInfo targets[][];
 
-    public BlockCommand() {
-      this.action = DatanodeProtocol.DataNodeAction.DNA_UNKNOWN;
-      this.blocks = new Block[0];
-      this.targets = new DatanodeInfo[0][];
-    }
+    public BlockCommand() {}
 
     /**
      * Create BlockCommand for transferring blocks to another datanode
@@ -56,7 +73,7 @@
      * @param targets   nodes to transfer
      */
     public BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-      this.action = DatanodeProtocol.DataNodeAction.DNA_TRANSFER;
+      super(  DatanodeProtocol.DataNodeAction.DNA_TRANSFER );
       this.blocks = blocks;
       this.targets = targets;
     }
@@ -66,16 +83,11 @@
      * @param blocks  blocks to invalidate
      */
     public BlockCommand(Block blocks[]) {
-      this.action = DatanodeProtocol.DataNodeAction.DNA_INVALIDATE;
+      super( DatanodeProtocol.DataNodeAction.DNA_INVALIDATE );
       this.blocks = blocks;
       this.targets = new DatanodeInfo[0][];
     }
 
-    public BlockCommand( DatanodeProtocol.DataNodeAction action ) {
-      this();
-      this.action = action;
-    }
-
     public Block[] getBlocks() {
         return blocks;
     }
@@ -87,8 +99,16 @@
     ///////////////////////////////////////////
     // Writable
     ///////////////////////////////////////////
+    static {                                      // register a ctor
+      WritableFactories.setFactory
+        (BlockCommand.class,
+         new WritableFactory() {
+           public Writable newInstance() { return new BlockCommand(); }
+         });
+    }
+
     public void write(DataOutput out) throws IOException {
-        WritableUtils.writeEnum( out, action );
+        super.write( out );
         out.writeInt(blocks.length);
         for (int i = 0; i < blocks.length; i++) {
             blocks[i].write(out);
@@ -103,8 +123,7 @@
     }
 
     public void readFields(DataInput in) throws IOException {
-        this.action = (DatanodeProtocol.DataNodeAction)
-            WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
+        super.readFields( in );
         this.blocks = new Block[in.readInt()];
         for (int i = 0; i < blocks.length; i++) {
             blocks[i] = new Block();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Apr  3 14:39:25 2007
@@ -29,11 +29,10 @@
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-    /* 7 : periodic checkpoint added.
-     * 8 : refreshNodes added
-     * 9 : clientMachine is removed from open() and create().
+    /*
+     * 10: finalizeUpgrade() added
      */
-    public static final long versionID = 9L;  
+    public static final long versionID = 10L;  
   
     ///////////////////////////////////////
     // File contents
@@ -348,4 +347,12 @@
      */
     public void rollFsImage() throws IOException;
 
+    /**
+     * Finalize previous upgrade.
+     * Remove file system state saved during the upgrade.
+     * The upgrade will become irreversible.
+     * 
+     * @throws IOException
+     */
+    public void finalizeUpgrade() throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Tue Apr  3 14:39:25 2007
@@ -197,6 +197,26 @@
 
 
     /**
+     * Command to ask the namenode to finalize previously performed upgrade.
+     * Usage: java DFSAdmin -finalizeUpgrade
+     * @exception IOException 
+     */
+    public int finalizeUpgrade() throws IOException {
+      int exitCode = -1;
+
+      if (!(fs instanceof DistributedFileSystem)) {
+        System.out.println("FileSystem is " + fs.getUri());
+        return exitCode;
+      }
+
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      dfs.finalizeUpgrade();
+      exitCode = 0;
+   
+      return exitCode;
+    }
+
+    /**
      * Displays format of commands.
      * @param cmd The command that is being executed.
      */
@@ -210,11 +230,15 @@
           } else if ("-refreshNodes".equals(cmd)) {
             System.err.println("Usage: java DFSAdmin"
                 + " [-refreshNodes]");
+          } else if ("-finalizeUpgrade".equals(cmd)) {
+            System.err.println("Usage: java DFSAdmin"
+                + " [-finalizeUpgrade]");
           } else {
             System.err.println("Usage: java DFSAdmin");
             System.err.println("           [-report]");
             System.err.println("           [-safemode enter | leave | get | wait]");
             System.err.println("           [-refreshNodes]");
+            System.err.println("           [-finalizeUpgrade]");
             System.err.println("           [-help [cmd]]");
           }
     }
@@ -253,6 +277,11 @@
                   printUsage(cmd);
                   return exitCode;
                 }
+        } else if ("-finalizeUpgrade".equals(cmd)) {
+                if (argv.length != 1) {
+                  printUsage(cmd);
+                  return exitCode;
+                }
         }
 
 
@@ -276,6 +305,8 @@
                 setSafeMode(argv, i);
             } else if ("-refreshNodes".equals(cmd)) {
                 exitCode = refreshNodes();
+            } else if ("-finalizeUpgrade".equals(cmd)) {
+                exitCode = finalizeUpgrade();
             } else if ("-help".equals(cmd)) {
                 if (i < argv.length) {
                     printHelp(argv[i]);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Apr  3 14:39:25 2007
@@ -377,6 +377,13 @@
     }
     
     /**
+     * @see ClientProtocol#finalizeUpgrade()
+     */
+    public void finalizeUpgrade() throws IOException {
+      namenode.finalizeUpgrade();
+    }
+
+    /**
      */
     public boolean mkdirs(UTF8 src) throws IOException {
         checkOpen();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Apr  3 14:39:25 2007
@@ -106,18 +106,20 @@
         return new InetSocketAddress(hostname, port);
     }
 
-    DatanodeProtocol namenode;
-    FSDataset data;
-    DatanodeRegistration dnRegistration;
+    DatanodeProtocol namenode = null;
+    FSDataset data = null;
+    DatanodeRegistration dnRegistration = null;
     private String networkLoc;
     volatile boolean shouldRun = true;
     Vector receivedBlockList = new Vector();
     int xmitsInProgress = 0;
     Daemon dataXceiveServer = null;
     long blockReportInterval;
+    long lastBlockReport = 0;
+    long lastHeartbeat = 0;
     long heartBeatInterval;
     private DataStorage storage = null;
-    private StatusHttpServer infoServer;
+    private StatusHttpServer infoServer = null;
     private DataNodeMetrics myMetrics = new DataNodeMetrics();
     private static InetSocketAddress nameNodeAddr;
     private static DataNode datanodeObject = null;
@@ -187,73 +189,57 @@
     }
 
     /**
-     * Create the DataNode given a configuration and an array of dataDirs.
-     * 'dataDirs' is where the blocks are stored.
+     * @deprecated
+     * TODO: only MiniDFSCluster needs it, should be removed
      */
-    DataNode(Configuration conf, String[] dataDirs) throws IOException {
-      this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
-    }
-    
-    DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
-        this(networkLoc, dataDirs,
-             createSocketAddr(conf.get("fs.default.name", "local")), conf);
-        // register datanode
-        int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
-        String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
-        this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
-        //create a servlet to serve full-file content
-        this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
-        this.infoServer.start();
-        this.dnRegistration.infoPort = this.infoServer.getPort();
-        // register datanode
-        try {
-          register();
-        } catch (IOException ie) {
-          try {
-            infoServer.stop();
-          } catch (Exception e) {
-          }
-          throw ie;
-        }
-        datanodeObject = this;
+    DataNode( Configuration conf, String networkLoc, String[] dataDirs ) throws IOException {
+      // networkLoc is ignored since it is already in the conf
+      this( conf, Storage.makeListOfFiles( dataDirs ) );
     }
     
     /**
-     * A DataNode can also be created with configuration information
-     * explicitly given.
-     * 
-     * @see DataStorage
+     * Create the DataNode given a configuration and an array of dataDirs.
+     * 'dataDirs' is where the blocks are stored.
      */
-    private DataNode(String networkLoc,
-                    String[] dataDirs, 
-                    InetSocketAddress nameNodeAddr, 
-                    Configuration conf ) throws IOException {
-      File[] volumes = new File[dataDirs.length];
-      for (int idx = 0; idx < dataDirs.length; idx++) {
-        volumes[idx] = new File(dataDirs[idx]);
+    DataNode( Configuration conf, 
+              AbstractList<File> dataDirs ) throws IOException {
+      try {
+        startDataNode( conf, dataDirs );
+      } catch (IOException ie) {
+        shutdown();
+        throw ie;
       }
-
+    }
+    
+    void startDataNode( Configuration conf, 
+                        AbstractList<File> dataDirs
+                       ) throws IOException {
       // use configured nameserver & interface to get local hostname
-      machineName =
-        DNS.getDefaultHost
-        (conf.get("dfs.datanode.dns.interface","default"),
-         conf.get("dfs.datanode.dns.nameserver","default"));
- 
-      // get storage info and lock the data dirs
-      storage = new DataStorage( volumes );
-      int numDirs = storage.getNumLocked();
-      if (numDirs == 0) { // all data dirs are in use
-        throw new IOException("Cannot start multiple Datanode instances "
-                              + "sharing the same data directories.\n"
-                              + StringUtils.arrayToString(dataDirs) + " are locked. ");
-      }
-      volumes = storage.getLockedDirs();
+      machineName = DNS.getDefaultHost(
+          conf.get("dfs.datanode.dns.interface","default"),
+          conf.get("dfs.datanode.dns.nameserver","default"));
+      InetSocketAddress nameNodeAddr = createSocketAddr(
+          conf.get("fs.default.name", "local"));
+
       // connect to name node
       this.namenode = (DatanodeProtocol) 
           RPC.waitForProxy(DatanodeProtocol.class,
                            DatanodeProtocol.versionID,
                            nameNodeAddr, 
                            conf);
+      // get version and id info from the name-node
+      NamespaceInfo nsInfo = handshake();
+
+      // read storage info, lock data dirs and transition fs state if necessary
+      StartupOption startOpt = (StartupOption)conf.get( "dfs.datanode.startup", 
+                                                        StartupOption.REGULAR );
+      assert startOpt != null : "Startup option must be set.";
+      storage = new DataStorage();
+      storage.recoverTransitionRead( nsInfo, dataDirs, startOpt );
+      
+      // initialize data node internal structure
+      this.data = new FSDataset( storage, conf );
+      
       // find free port
       ServerSocket ss = null;
       int tmpPort = conf.getInt("dfs.datanode.port", 50010);
@@ -268,15 +254,11 @@
         }
       }
       // construct registration
-      this.dnRegistration = new DatanodeRegistration(
-                                        DFS_CURRENT_VERSION, 
-                                        machineName + ":" + tmpPort, 
-                                        storage.getStorageID(),
-                                        -1,
-                                        "" );
-      this.networkLoc = networkLoc;
-      // initialize data node internal structure
-      this.data = new FSDataset(volumes, conf);
+      this.dnRegistration = new DatanodeRegistration( 
+                                    machineName + ":" + tmpPort, 
+                                    -1,   // info port determined later
+                                    storage );
+      
       this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
 
       long blockReportIntervalBasis =
@@ -284,7 +266,55 @@
       this.blockReportInterval =
         blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
       this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
-      this.nameNodeAddr = nameNodeAddr;
+      DataNode.nameNodeAddr = nameNodeAddr;
+
+      //create a servlet to serve full-file content
+      int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
+      String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
+      this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
+      this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
+      this.infoServer.start();
+      this.dnRegistration.infoPort = this.infoServer.getPort();
+      // get network location
+      this.networkLoc = conf.get( "dfs.datanode.rack" );
+      if( networkLoc == null )  // exec network script or set the default rack
+        networkLoc = getNetworkLoc( conf );
+      // register datanode
+      register();
+      datanodeObject = this;
+    }
+
+    private NamespaceInfo handshake() throws IOException {
+      NamespaceInfo nsInfo;
+      while( true ) {
+        try {
+          nsInfo = namenode.versionRequest();
+          break;
+        } catch( SocketTimeoutException e ) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + getNameNodeAddr());
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {}
+        }
+      }
+      String errorMsg = null;
+      // verify build version
+      if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
+        errorMsg = "Incompatible build versions: namenode BV = " 
+          + nsInfo.getBuildVersion() + "; datanode BV = "
+          + Storage.getBuildVersion();
+        LOG.fatal( errorMsg );
+        try {
+          namenode.errorReport( dnRegistration,
+                                DatanodeProtocol.NOTIFY, errorMsg );
+        } catch( SocketTimeoutException e ) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + getNameNodeAddr());
+        }
+        throw new IOException( errorMsg );
+      }
+      assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+        "Data-node and name-node layout versions must be the same.";
+      return nsInfo;
     }
 
     /** Return the DataNode object
@@ -314,7 +344,7 @@
      * 2) to receive a registrationID 
      * issued by the namenode to recognize registered datanodes.
      * 
-     * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+     * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
      * @throws IOException
      */
     private void register() throws IOException {
@@ -355,7 +385,7 @@
         }
         if (storage != null) {
           try {
-            this.storage.closeAll();
+            this.storage.unlockAll();
           } catch (IOException ie) {
           }
         }
@@ -388,7 +418,6 @@
      */
     public void offerService() throws Exception {
      
-      long lastHeartbeat = 0, lastBlockReport = 0;
       LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
 
       //
@@ -410,47 +439,17 @@
             // -- Total capacity
             // -- Bytes remaining
             //
-            BlockCommand cmd = namenode.sendHeartbeat(dnRegistration, 
+            DatanodeCommand cmd = namenode.sendHeartbeat( dnRegistration, 
                                                       data.getCapacity(), 
                                                       data.getRemaining(), 
                                                       xmitsInProgress,
                                                       xceiverCount.getValue());
             //LOG.info("Just sent heartbeat, with name " + localName);
             lastHeartbeat = now;
-
-            if( cmd != null ) {
-              switch( cmd.action ) {
-              case DNA_TRANSFER:
-                //
-                // Send a copy of a block to another datanode
-                //
-                transferBlocks( cmd.getBlocks(), cmd.getTargets() );
-                break;
-              case DNA_INVALIDATE:
-                //
-                // Some local block(s) are obsolete and can be 
-                // safely garbage-collected.
-                //
-                Block toDelete[] = cmd.getBlocks();
-                data.invalidate(toDelete);
-                myMetrics.removedBlocks(toDelete.length);
-                break;
-              case DNA_SHUTDOWN:
-                // shut down the data node
-                this.shutdown();
-                continue;
-              case DNA_REGISTER:
-                // namenode requested a registration
-                register();
-                lastHeartbeat=0;
-                lastBlockReport=0;
-                continue;
-              default:
-                LOG.warn( "Unknown BlockCommand action: " + cmd.action);
-              }
-            }
+            if( ! processCommand( cmd ) )
+              continue;
           }
-            
+
           // send block report
           if (now - lastBlockReport > blockReportInterval) {
             //
@@ -458,9 +457,9 @@
             // Get back a list of local block(s) that are obsolete
             // and can be safely GC'ed.
             //
-            Block toDelete[] = namenode.blockReport(dnRegistration,
-                                                    data.getBlockReport());
-            data.invalidate(toDelete);
+            DatanodeCommand cmd = namenode.blockReport( dnRegistration,
+                                                        data.getBlockReport());
+            processCommand( cmd );
             lastBlockReport = now;
             continue;
           }
@@ -516,6 +515,51 @@
       } // while (shouldRun)
     } // offerService
 
+    /**
+     * 
+     * @param cmd
+     * @return true if further processing may be required or false otherwise. 
+     * @throws IOException
+     */
+    private boolean processCommand( DatanodeCommand cmd ) throws IOException {
+      if( cmd == null )
+        return true;
+      switch( cmd.action ) {
+      case DNA_TRANSFER:
+        //
+        // Send a copy of a block to another datanode
+        //
+        BlockCommand bcmd = (BlockCommand)cmd;
+        transferBlocks( bcmd.getBlocks(), bcmd.getTargets() );
+        break;
+      case DNA_INVALIDATE:
+        //
+        // Some local block(s) are obsolete and can be 
+        // safely garbage-collected.
+        //
+        Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+        data.invalidate(toDelete);
+        myMetrics.removedBlocks(toDelete.length);
+        break;
+      case DNA_SHUTDOWN:
+        // shut down the data node
+        this.shutdown();
+        return false;
+      case DNA_REGISTER:
+        // namenode requested a registration
+        register();
+        lastHeartbeat=0;
+        lastBlockReport=0;
+        break;
+      case DNA_FINALIZE:
+        storage.finalizeUpgrade();
+        break;
+      default:
+        LOG.warn( "Unknown DatanodeCommand action: " + cmd.action);
+      }
+      return true;
+    }
+    
     private void transferBlocks(  Block blocks[], 
                                   DatanodeInfo xferTargets[][] 
                                 ) throws IOException {
@@ -1074,9 +1118,9 @@
     
     /** Start datanode daemon.
      */
-    public static void run(Configuration conf, String networkLoc) throws IOException {
+    public static void run(Configuration conf) throws IOException {
         String[] dataDirs = conf.getStrings("dfs.data.dir");
-        DataNode dn = makeInstance(networkLoc, dataDirs, conf);
+        DataNode dn = makeInstance(dataDirs, conf);
         if (dn != null) {
           dataNodeList.add(dn);
           Thread t = new Thread(dn, "DataNode: [" +
@@ -1087,25 +1131,34 @@
         }
     }
     
-    /**
-     * Shut down all datanodes that where started via the 
-     * run(conf,networkLoc) method.
-     * Returns only after shutdown is complete.
-     */
-    public static void shutdownAll(){
-      while (!dataNodeList.isEmpty()) {
-        dataNodeList.remove(0).shutdown();
-        dataNodeThreadList.remove(0).interrupt();
-      }
+  /**
+   * Shut down all datanodes that where started via the 
+   * run(conf,networkLoc) method.
+   * Returns only after shutdown is complete.
+   */
+  public static void shutdownAll(){
+    while (!dataNodeList.isEmpty()) {
+      dataNodeList.remove(0).shutdown();
+      dataNodeThreadList.remove(0).interrupt();
     }
-
+  }
 
   /** Start a single datanode daemon and wait for it to finish.
    *  If this thread is specifically interrupted, it will stop waiting.
    */
-  private static void runAndWait(Configuration conf, String networkLoc)
-    throws IOException {
-    run(conf, networkLoc);
+  static DataNode createDataNode( String args[],
+                                  Configuration conf ) throws IOException {
+    if( conf == null )
+      conf = new Configuration();
+    if( ! parseArguments( args, conf )) {
+      printUsage();
+      return null;
+    }
+    run(conf);
+    return (DataNode)dataNodeList.get(0);
+  }
+
+  void join() {
     if (dataNodeThreadList.size() > 0) {
       Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
       try {
@@ -1125,24 +1178,22 @@
    * no directory from this directory list can be created.
    * @throws IOException
    */
-  static DataNode makeInstance( String[] dataDirs, Configuration conf)
-  throws IOException {
-    return makeInstance(NetworkTopology.DEFAULT_RACK, dataDirs, conf );
-  }
-  
-  static DataNode makeInstance(String networkLoc, String[] dataDirs, Configuration conf)
+  static DataNode makeInstance( String[] dataDirs, Configuration conf )
   throws IOException {
-    ArrayList<String> dirs = new ArrayList<String>();
+    ArrayList<File> dirs = new ArrayList<File>();
     for (int i = 0; i < dataDirs.length; i++) {
       File data = new File(dataDirs[i]);
       try {
         DiskChecker.checkDir( data );
-        dirs.add(dataDirs[i]);
+        dirs.add(data);
       } catch( DiskErrorException e ) {
         LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
       }
     }
-    return ((dirs.size() > 0) ? new DataNode(conf, networkLoc, dirs.toArray(new String[dirs.size()])) : null);
+    if( dirs.size() > 0 ) 
+      return new DataNode(conf, dirs);
+    LOG.error("All directories in dfs.data.dir are invalid." );
+    return null;
   }
 
   public String toString() {
@@ -1154,11 +1205,49 @@
         "}";
   }
   
+  private static void printUsage() {
+    System.err.println("Usage: java DataNode");
+    System.err.println("           [-r, --rack <network location>] |");
+    System.err.println("           [-rollback]");
+  }
+
+  /**
+   * Parse and verify command line arguments and set configuration parameters.
+   *
+   * @return false if passed argements are incorrect
+   */
+  private static boolean parseArguments(String args[], 
+                                        Configuration conf ) {
+    int argsLen = (args == null) ? 0 : args.length;
+    StartupOption startOpt = StartupOption.REGULAR;
+    String networkLoc = null;
+    for( int i=0; i < argsLen; i++ ) {
+      String cmd = args[i];
+      if( "-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd) ) {
+        if( i==args.length-1 )
+          return false;
+        networkLoc = args[++i];
+        if( networkLoc.startsWith("-") )
+          return false;
+      } else if( "-rollback".equalsIgnoreCase(cmd) ) {
+        startOpt = StartupOption.ROLLBACK;
+      } else if( "-regular".equalsIgnoreCase(cmd) ) {
+        startOpt = StartupOption.REGULAR;
+      } else
+        return false;
+    }
+    if( networkLoc != null )
+      conf.set( "dfs.datanode.rack", NodeBase.normalize( networkLoc ));
+    conf.setObject( "dfs.datanode.startup", startOpt );
+    return true;
+  }
+
     /* Get the network location by running a script configured in conf */
     private static String getNetworkLoc( Configuration conf ) 
                           throws IOException {
         String locScript = conf.get("dfs.network.script" );
-        if( locScript == null ) return null;
+        if( locScript == null ) 
+          return NetworkTopology.DEFAULT_RACK;
 
         LOG.info( "Starting to run script to get datanode network location");
         Process p = Runtime.getRuntime().exec( locScript );
@@ -1222,49 +1311,13 @@
         return networkLoc.toString();
     }
 
-
-    /* Get the network location from the command line */
-    private static String getNetworkLoc(String args[]) {
-        for( int i=0; i< args.length; i++ ) { 
-            if ("-r".equals(args[i])||"--rack".equals(args[i]) ) {
-                if( i==args.length-1 ) {
-                    printUsage();
-                } else {
-                    return args[++i];
-                }
-            }
-        }
-        return null;
-    }
-    
-    /* Return the datanode's network location 
-     * either from the command line, from script, or a default value
-     */
-    private static String getNetworkLoc(String args[], Configuration conf)
-                          throws IOException {
-        String networkLoc = getNetworkLoc( args );
-        if( networkLoc == null ) {
-            networkLoc = getNetworkLoc( conf );
-        }
-        if( networkLoc == null ) {
-            return NetworkTopology.DEFAULT_RACK;
-        } else {
-            return NodeBase.normalize( networkLoc );
-        }
-    }
-    
-    private static void printUsage() {
-        System.err.println(
-                "Usage: java DataNode [-r, --rack <network location>]");        
-    }
-
-
     /**
      */
-    public static void main(String args[]) throws IOException {
+    public static void main(String args[]) {
       try {
-        Configuration conf = new Configuration();
-        runAndWait(conf, getNetworkLoc(args, conf));
+        DataNode datanode = createDataNode( args, null );
+        if( datanode != null )
+          datanode.join();
       } catch ( Throwable e ) {
         LOG.error( StringUtils.stringifyException( e ) );
         System.exit(-1);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Tue Apr  3 14:39:25 2007
@@ -4,213 +4,409 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
+import java.util.Collection;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
+import org.apache.hadoop.dfs.FSConstants.NodeType;
+import org.apache.hadoop.dfs.FSImage.NameNodeFile;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.fs.FileUtil.HardLink;
 
 /** 
  * Data storage information file.
  * <p>
- * During startup the datanode reads its data storage file.
- * The data storage file is stored in all the dfs.data.dir directories.
- * It contains version and storageID.
- * Datanode holds a lock on all the dataStorage files while it runs so that other 
- * datanodes were not able to start working with the same data storage.
- * The locks are released when the datanode stops (normally or abnormally).
- * 
+ * @see Storage
  * @author Konstantin Shvachko
  */
-class DataStorage {
-  public static final String STORAGE_INFO_FILE_NAME = "storage";
-  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataStorage");
-
-  // persistent fields
-  private int version = 0;  /// stored version
-  private String storageID; /// unique per cluster storageID
+class DataStorage extends Storage {
+  // Constants
+  final static String BLOCK_SUBDIR_PREFIX = "subdir";
+  final static String BLOCK_FILE_PREFIX = "blk_";
   
-  // non persistent fields
-  private ArrayList storageFiles = new ArrayList();
-  private ArrayList storageLocks = new ArrayList();
+  private String storageID;
+
+  DataStorage() {
+    super( NodeType.DATA_NODE );
+    storageID = "";
+  }
   
-  // cache away the names of all passed in dirs
-  private File[] origDirs = null;
+  DataStorage( int nsID, long cT, String strgID ) {
+    super( NodeType.DATA_NODE, nsID, cT );
+    this.storageID = strgID;
+  }
   
-  // cache away the names of locked dirs
-  private File[] dirs = null;
+  DataStorage( StorageInfo storageInfo, String strgID ) {
+    super( NodeType.DATA_NODE, storageInfo );
+    this.storageID = strgID;
+  }
+
+  String getStorageID() {
+    return storageID;
+  }
   
-  private int numLocked = 0;
+  void setStorageID( String newStorageID ) {
+    this.storageID = newStorageID;
+  }
   
   /**
-   * Create DataStorage and verify its version.
+   * Analyze storage directories.
+   * Recover from previous transitions if required. 
+   * Perform fs state transition if necessary depending on the namespace info.
+   * Read storage info. 
    * 
+   * @param nsInfo namespace information
    * @param dataDirs array of data storage directories
+   * @param startOpt startup option
    * @throws IOException
    */
-  public DataStorage( File[] dataDirs ) throws IOException {
-    this( DataNode.DFS_CURRENT_VERSION, dataDirs );
+  void recoverTransitionRead( NamespaceInfo nsInfo,
+                              Collection<File> dataDirs,
+                              StartupOption startOpt
+                            ) throws IOException {
+    assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+      "Data-node and name-node layout versions must be the same.";
     
-    if( version < FSConstants.DFS_CURRENT_VERSION ) // future version
-      throw new IncorrectVersionException( version, "data storage" );
-  }
-  
-  /**
-   * Create DataStorage.
-   * 
-   * Read data storage files if they exist or create them if not.
-   * Lock the files.
-   * 
-   * @param curVersion can be used to read file saved with a previous version.
-   * @param dataDirs Array of data storage directories
-   * @throws IOException
-   */
-  public DataStorage( int curVersion, File[] dataDirs ) throws IOException {
-    this.version = curVersion;
-    this.origDirs = dataDirs;
-    for (int idx = 0; idx < dataDirs.length; idx++) {
-      storageFiles.add(idx, new RandomAccessFile( 
-                          new File(dataDirs[idx], STORAGE_INFO_FILE_NAME ), 
-                          "rws" ));
-      lock(idx);
-      boolean needToSave;
+    // 1. For each data directory calculate its state and 
+    // check whether all is consistent before transitioning.
+    // Format and recover.
+    this.storageID = "";
+    this.storageDirs = new ArrayList<StorageDirectory>( dataDirs.size() );
+    ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>( dataDirs.size() );
+    for( Iterator<File> it = dataDirs.iterator(); it.hasNext(); ) {
+      File dataDir = it.next();
+      StorageDirectory sd = new StorageDirectory( dataDir );
+      StorageState curState;
       try {
-        needToSave = read(idx);
-      } catch( java.io.EOFException e ) {
-        storageID = "";
-        needToSave = true;
-      }
-    
-      if( needToSave ) { write(idx); }
-      
-      RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
-      if (file != null) { numLocked++; }
-    }
-    if (numLocked > 0) {
-      this.dirs = new File[numLocked];
-      int curidx = 0;
-      for (int idx = 0; idx < dataDirs.length; idx++) {
-        if (storageFiles.get(idx) != null) {
-          dirs[curidx] = dataDirs[idx];
-          curidx++;
+        curState = sd.analyzeStorage( startOpt );
+        // sd is locked but not opened
+        switch( curState ) {
+        case NORMAL:
+          break;
+        case NON_EXISTENT:
+          // ignore this storage
+          LOG.info( "Storage directory " + dataDir + " does not exist." );
+          it.remove();
+          continue;
+        case CONVERT:
+          convertLayout( sd, nsInfo );
+          break;
+        case NOT_FORMATTED: // format
+          LOG.info( "Storage directory " + dataDir + " is not formatted." );
+          LOG.info( "Formatting ..." );
+          format( sd, nsInfo );
+          break;
+        default:  // recovery part is common
+          sd.doRecover( curState );
         }
+      } catch (IOException ioe) {
+        sd.unlock();
+        throw ioe;
       }
+      // add to the storage list
+      addStorageDir( sd );
+      dataDirStates.add( curState );
     }
-  }
-  
-  public int getVersion() {
-    return version;
-  }
 
-  public String getStorageID() {
-    return storageID;
+    if( dataDirs.size() == 0 )  // none of the data dirs exist
+      throw new IOException( 
+          "All specified directories are not accessible or do not exist." );
+
+    // 2. Do transitions
+    // Each storage directory is treated individually.
+    // During sturtup some of them can upgrade or rollback 
+    // while others could be uptodate for the regular startup.
+    for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      doTransition( getStorageDir( idx ), nsInfo, startOpt );
+      assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
+        "Data-node and name-node layout versions must be the same.";
+      assert this.getCTime() == nsInfo.getCTime() :
+        "Data-node and name-node CTimes must be the same.";
+    }
+    
+    // 3. Update all storages. Some of them might have just been formatted.
+    this.writeAll();
   }
-  
-  public int getNumLocked() {
-    return numLocked;
+
+  void format( StorageDirectory sd, NamespaceInfo nsInfo ) throws IOException {
+    sd.clearDirectory(); // create directory
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.namespaceID = nsInfo.getNamespaceID();
+    this.cTime = 0;
+    // store storageID as it currently is
+    sd.write();
   }
-  
-  public File[] getLockedDirs() {
-    return dirs;
+
+  protected void setFields( Properties props, 
+                            StorageDirectory sd 
+                          ) throws IOException {
+    super.setFields( props, sd );
+    props.setProperty( "storageID", storageID );
   }
-  
-  public void setStorageID( String newStorageID ) {
-    this.storageID = newStorageID;
+
+  protected void getFields( Properties props, 
+                            StorageDirectory sd 
+                          ) throws IOException {
+    super.getFields( props, sd );
+    String ssid = props.getProperty( "storageID" );
+    if( ssid == null ||
+        ! ("".equals( storageID ) || "".equals( ssid ) ||
+            storageID.equals( ssid )))
+      throw new InconsistentFSStateException( sd.root,
+                  "has incompatible storage Id." );
+    if( "".equals( storageID ) ) // update id only if it was empty
+      storageID = ssid;
   }
-  
-  public void setVersion( int newVersion ) {
-    this.version = newVersion;
+
+  boolean isConversionNeeded( StorageDirectory sd ) throws IOException {
+    File oldF = new File( sd.root, "storage" );
+    if( ! oldF.exists() )
+      return false;
+    // check consistency of the old storage
+    File oldDataDir = new File( sd.root, "data" );
+    if( ! oldDataDir.exists() ) 
+      throw new InconsistentFSStateException( sd.root,
+          "Old layout block directory " + oldDataDir + " is missing" ); 
+    if( ! oldDataDir.isDirectory() )
+      throw new InconsistentFSStateException( sd.root,
+          oldDataDir + " is not a directory." );
+    if( ! oldDataDir.canWrite() )
+      throw new InconsistentFSStateException( sd.root,
+          oldDataDir + " is not writable." );
+    return true;
   }
   
   /**
-   * Lock datastorage file.
+   * Automatic conversion from the old layout version to the new one.
    * 
+   * @param sd storage directory
+   * @param nsInfo namespace information
    * @throws IOException
    */
-  private void lock(int idx) throws IOException {
-    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
-    FileLock lock = file.getChannel().tryLock();
-    if (lock == null) {
-      // log a warning
-      LOG.warn("Cannot lock storage file in directory "+origDirs[idx].getName());
-      // remove the file from fileList, and close it
-      storageFiles.add(idx, null);
-      file.close();
+  private void convertLayout( StorageDirectory sd,
+                              NamespaceInfo nsInfo 
+                            ) throws IOException {
+    assert FSConstants.LAYOUT_VERSION < LAST_PRE_UPGRADE_LAYOUT_VERSION :
+      "Bad current layout version: FSConstants.LAYOUT_VERSION should decrease";
+    File oldF = new File( sd.root, "storage" );
+    File oldDataDir = new File( sd.root, "data" );
+    assert oldF.exists() : "Old datanode layout \"storage\" file is missing";
+    assert oldDataDir.exists() : "Old layout block directory \"data\" is missing";
+    LOG.info( "Old layout version file " + oldF
+            + " is found. New layout version is "
+            + FSConstants.LAYOUT_VERSION );
+    LOG.info( "Converting ..." );
+    
+    // Lock and Read old storage file
+    RandomAccessFile oldFile = new RandomAccessFile( oldF, "rws" );
+    if (oldFile == null)
+      throw new IOException( "Cannot read file: " + oldF );
+    FileLock oldLock = oldFile.getChannel().tryLock();
+    if (oldLock == null)
+      throw new IOException( "Cannot lock file: " + oldF );
+    try {
+      oldFile.seek(0);
+      int odlVersion = oldFile.readInt();
+      if( odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION )
+        throw new IncorrectVersionException( odlVersion, "file " + oldF,
+                                              LAST_PRE_UPGRADE_LAYOUT_VERSION );
+      String odlStorageID = org.apache.hadoop.io.UTF8.readString( oldFile );
+  
+      // check new storage
+      File newDataDir = sd.getCurrentDir();
+      File versionF = sd.getVersionFile();
+      if( versionF.exists() )
+        throw new IOException( "Version file already exists: " + versionF );
+      if( newDataDir.exists() ) // somebody created current dir manually
+        deleteDir( newDataDir );
+      // Write new layout
+      rename( oldDataDir, newDataDir );
+  
+      this.layoutVersion = FSConstants.LAYOUT_VERSION;
+      this.namespaceID = nsInfo.getNamespaceID();
+      this.cTime = 0;
+      this.storageID = odlStorageID;
+      sd.write();
+      // close and unlock old file
+    } finally {
+      oldLock.release();
+      oldFile.close();
     }
-    storageLocks.add(idx, lock);
+    // move old storage file into current dir
+    rename( oldF, new File( sd.getCurrentDir(), "storage" ));
+    LOG.info( "Conversion of " + oldF + " is complete." );
   }
-  
+
   /**
-   * Unlock datastorage file.
-   * @param idx File index
+   * Analize which and whether a transition of the fs state is required
+   * and perform it if necessary.
    * 
+   * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime
+   * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
+   * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
+   * 
+   * @param sd  storage directory
+   * @param nsInfo  namespace info
+   * @param startOpt  startup option
    * @throws IOException
    */
-  private void unlock(int idx) throws IOException {
-    FileLock lock = (FileLock) storageLocks.get(idx);
-    if (lock != null) { lock.release(); }
-  }
-  
-  /**
-   * Close a datastorage file.
-   * @param idx file index
-   * @throws IOException
-   */
-  private void close(int idx) throws IOException {
-    FileLock lock = (FileLock) storageLocks.get(idx);
-    if (lock == null) { return; }
-    lock.release();
-    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
-    file.close();
-  }
-  
-  /**
-   * Close all datastorage files.
-   * @throws IOException
-   */
-  public void closeAll() throws IOException {
-    for (int idx = 0; idx < dirs.length; idx++) {
-      close(idx);
+  private void doTransition(  StorageDirectory sd, 
+                              NamespaceInfo nsInfo, 
+                              StartupOption startOpt
+                            ) throws IOException {
+    if( startOpt == StartupOption.ROLLBACK )
+      doRollback( sd, nsInfo ); // rollback if applicable
+    sd.read();
+    assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
+      "Future version is not allowed";
+    if( getNamespaceID() != nsInfo.getNamespaceID() )
+      throw new IOException( 
+          "Incompatible namespaceIDs in " + sd.root.getCanonicalPath()
+          + ": namenode namespaceID = " + nsInfo.getNamespaceID() 
+          + "; datanode namespaceID = " + getNamespaceID() );
+    if( this.layoutVersion == FSConstants.LAYOUT_VERSION 
+        && this.cTime == nsInfo.getCTime() )
+      return; // regular startup
+    if( this.layoutVersion > FSConstants.LAYOUT_VERSION
+        || this.cTime < nsInfo.getCTime() ) {
+      doUpgrade( sd, nsInfo );  // upgrade
+      return;
     }
+    // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
+    // must shutdown
+    throw new IOException("Datanode state: LV = " + this.getLayoutVersion() 
+                          + " CTime = " + this.getCTime() 
+                          + " is newer than the namespace state: LV = "
+                          + nsInfo.getLayoutVersion() 
+                          + " CTime = " + nsInfo.getCTime() );
   }
-  
+
   /**
-   * Read data storage file.
-   * @param idx File index
-   * @return whether the data storage file need to be updated.
+   * Move current storage into a backup directory,
+   * and hardlink all its blocks into the new current directory.
+   * 
+   * @param sd  storage directory
    * @throws IOException
    */
-  private boolean read(int idx) throws IOException {
-    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
-    if (file == null) { return false; }
-    file.seek(0);
-    this.version = file.readInt();
-    this.storageID = UTF8.readString( file );
-    return false;
+  void doUpgrade( StorageDirectory sd,
+                  NamespaceInfo nsInfo
+                ) throws IOException {
+    LOG.info( "Upgrading storage directory " + sd.root 
+            + ".\n   old LV = " + this.getLayoutVersion()
+            + "; old CTime = " + this.getCTime()
+            + ".\n   new LV = " + nsInfo.getLayoutVersion()
+            + "; new CTime = " + nsInfo.getCTime() );
+    File curDir = sd.getCurrentDir();
+    File prevDir = sd.getPreviousDir();
+    assert curDir.exists() : "Current directory must exist.";
+    // delete previous dir before upgrading
+    if( prevDir.exists() )
+      deleteDir( prevDir );
+    File tmpDir = sd.getPreviousTmp();
+    assert ! tmpDir.exists() : "previous.tmp directory must not exist.";
+    // rename current to tmp
+    rename( curDir, tmpDir );
+    // hardlink blocks
+    linkBlocks( tmpDir, curDir );
+    // write version file
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    assert this.namespaceID == nsInfo.getNamespaceID() :
+      "Data-node and name-node layout versions must be the same.";
+    this.cTime = nsInfo.getCTime();
+    sd.write();
+    // rename tmp to previous
+    rename( tmpDir, prevDir );
+    LOG.info( "Upgrade of " + sd.root + " is complete." );
   }
 
-  /**
-   * Write data storage file.
-   * @param idx File index
-   * @throws IOException
-   */
-  private void write(int idx) throws IOException {
-    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
-    if (file == null) { return; }
-    file.seek(0);
-    file.writeInt( this.version );
-    UTF8.writeString( file, this.storageID );
+  void doRollback(  StorageDirectory sd,
+                    NamespaceInfo nsInfo
+                  ) throws IOException {
+    File prevDir = sd.getPreviousDir();
+    // regular startup if previous dir does not exist
+    if( ! prevDir.exists() )
+      return;
+    DataStorage prevInfo = new DataStorage();
+    StorageDirectory prevSD = prevInfo.new StorageDirectory( sd.root );
+    prevSD.read( prevSD.getPreviousVersionFile() );
+
+    // We allow rollback to a state, which is either consistent with
+    // the namespace state or can be further upgraded to it.
+    if( ! ( prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
+        && prevInfo.getCTime() <= nsInfo.getCTime() ))  // cannot rollback
+      throw new InconsistentFSStateException( prevSD.root,
+          "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
+          + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
+          + " is newer than the namespace state: LV = "
+          + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime() );
+    LOG.info( "Rolling back storage directory " + sd.root 
+        + ".\n   target LV = " + nsInfo.getLayoutVersion()
+        + "; target CTime = " + nsInfo.getCTime() );
+    File tmpDir = sd.getRemovedTmp();
+    assert ! tmpDir.exists() : "removed.tmp directory must not exist.";
+    // rename current to tmp
+    File curDir = sd.getCurrentDir();
+    assert curDir.exists() : "Current directory must exist.";
+    rename( curDir, tmpDir );
+    // rename previous to current
+    rename( prevDir, curDir );
+    // delete tmp dir
+    deleteDir( tmpDir );
+    LOG.info( "Rollback of " + sd.root + " is complete." );
+  }
+
+  void doFinalize( StorageDirectory sd ) throws IOException {
+    File prevDir = sd.getPreviousDir();
+    if( ! prevDir.exists() )
+      return; // already discarded
+    final String dataDirPath = sd.root.getCanonicalPath();
+    LOG.info( "Finalizing upgrade for storage directory " 
+            + dataDirPath 
+            + ".\n   cur LV = " + this.getLayoutVersion()
+            + "; cur CTime = " + this.getCTime() );
+    assert sd.getCurrentDir().exists() : "Current directory must exist.";
+    final File tmpDir = sd.getFinalizedTmp();
+    // rename previous to tmp
+    rename( prevDir, tmpDir );
+
+    // delete tmp dir in a separate thread
+    new Daemon( new Runnable() {
+      public void run() {
+        try {
+          deleteDir( tmpDir );
+        } catch( IOException ex ) {
+          LOG.error( "Finalize upgrade for " + dataDirPath + " failed.", ex );
+        }
+        LOG.info( "Finalize upgrade for " + dataDirPath + " is complete." );
+      }
+      public String toString() { return "Finalize " + dataDirPath; }
+    }).start();
   }
   
-  /**
-   * Write all data storage files.
-   * @throws IOException
-   */
-  public void writeAll() throws IOException {
-    for (int idx = 0; idx < dirs.length; idx++) {
-      write(idx);
+  void finalizeUpgrade() throws IOException {
+    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
+      doFinalize( it.next() );
     }
   }
   
+  static void linkBlocks( File from, File to ) throws IOException {
+    if( ! from.isDirectory() ) {
+      HardLink.createHardLink( from, to );
+      return;
+    }
+    // from is a directory
+    if( ! to.mkdir() )
+      throw new IOException("Cannot create directory " + to );
+    String[] blockNames = from.list( new java.io.FilenameFilter() {
+      public boolean accept(File dir, String name) {
+        return name.startsWith( BLOCK_SUBDIR_PREFIX ) 
+            || name.startsWith( BLOCK_FILE_PREFIX );
+      }
+    });
+    
+    for( int i = 0; i < blockNames.length; i++ )
+      linkBlocks( new File(from, blockNames[i]), new File(to, blockNames[i]) );
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Tue Apr  3 14:39:25 2007
@@ -31,9 +31,15 @@
  * @author Michael Cafarella
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
-  public static final long versionID = 5L;  // register takes a new parameter
+  /*
+   * 6: versionRequest() added;
+   * sendHeartbeat() and blockReport() return DatanodeCommand;
+   * DatanodeRegistration contains StorageInfo
+   */
+  public static final long versionID = 6L;
   
   // error code
+  final static int NOTIFY = 0;
   final static int DISK_ERROR = 1;
   final static int INVALID_BLOCK = 2;
 
@@ -45,7 +51,8 @@
                               DNA_TRANSFER,   // transfer blocks to another datanode
                               DNA_INVALIDATE, // invalidate blocks
                               DNA_SHUTDOWN,   // shutdown node
-                              DNA_REGISTER; }   // re-register
+                              DNA_REGISTER,   // re-register
+                              DNA_FINALIZE; } // finalize previous upgrade
 
   /** 
    * Register Datanode.
@@ -63,14 +70,14 @@
     /**
      * sendHeartbeat() tells the NameNode that the DataNode is still
      * alive and well.  Includes some status info, too. 
-     * It also gives the NameNode a chance to return a "BlockCommand" object.
-     * A BlockCommand tells the DataNode to invalidate local block(s), 
+     * It also gives the NameNode a chance to return a "DatanodeCommand" object.
+     * A DatanodeCommand tells the DataNode to invalidate local block(s), 
      * or to copy them to other DataNodes, etc.
      */
-    public BlockCommand sendHeartbeat(DatanodeRegistration registration,
-                                      long capacity, long remaining,
-                                      int xmitsInProgress,
-                                      int xceiverCount) throws IOException;
+    public DatanodeCommand sendHeartbeat( DatanodeRegistration registration,
+                                          long capacity, long remaining,
+                                          int xmitsInProgress,
+                                          int xceiverCount) throws IOException;
 
     /**
      * blockReport() tells the NameNode about all the locally-stored blocks.
@@ -79,8 +86,8 @@
      * the locally-stored blocks.  It's invoked upon startup and then
      * infrequently afterwards.
      */
-    public Block[] blockReport( DatanodeRegistration registration,
-                                Block blocks[]) throws IOException;
+    public DatanodeCommand blockReport( DatanodeRegistration registration,
+                                        Block blocks[]) throws IOException;
     
     /**
      * blockReceived() allows the DataNode to tell the NameNode about
@@ -98,4 +105,6 @@
     public void errorReport(DatanodeRegistration registration,
                             int errorCode, 
                             String msg) throws IOException;
+    
+    public NamespaceInfo versionRequest() throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java Tue Apr  3 14:39:25 2007
@@ -25,40 +25,50 @@
        });
   }
 
-  int version;            /// current Datanode version
-  String registrationID;  /// a unique per namenode id; indicates   
-                          /// the namenode the datanode is registered with
+  StorageInfo storageInfo;
 
   /**
    * Default constructor.
    */
   public DatanodeRegistration() {
-    this( 0, null, null, -1, null );
+    super( null, null, -1 );
+    this.storageInfo = new StorageInfo();
   }
   
   /**
    * Create DatanodeRegistration
    */
+  public DatanodeRegistration(String nodeName, 
+                              int infoPort,
+                              DataStorage storage ) {
+    super( nodeName, storage.getStorageID(), infoPort );
+    this.storageInfo = new StorageInfo( storage );
+  }
+
+  /**
+   * Create DatanodeRegistration
+   * @deprecated 
+   * use {@link #DatanodeRegistration(String, int, DataStorage)} instead
+   */
   public DatanodeRegistration(int version, 
                               String nodeName, 
                               String storageID,
                               int infoPort,
                               String registrationID ) {
     super( nodeName, storageID, infoPort );
-    this.version = version;
-    this.registrationID = registrationID;
+    this.storageInfo = new StorageInfo();
   }
 
   /**
    */
   public int getVersion() {
-    return version;
+    return storageInfo.getLayoutVersion();
   }
   
   /**
    */
   public String getRegistrationID() {
-    return registrationID;
+    return Storage.getRegistrationID( storageInfo );
   }
 
   /////////////////////////////////////////////////
@@ -67,16 +77,18 @@
   /**
    */
   public void write(DataOutput out) throws IOException {
-    out.writeInt(this.version);
     super.write( out );
-    UTF8.writeString(out, registrationID);
+    out.writeInt( storageInfo.getLayoutVersion() );
+    out.writeInt( storageInfo.getNamespaceID() );
+    out.writeLong( storageInfo.getCTime() );
   }
 
   /**
    */
   public void readFields(DataInput in) throws IOException {
-    this.version = in.readInt();
     super.readFields(in);
-    this.registrationID = UTF8.readString(in);   
+    storageInfo.layoutVersion = in.readInt();
+    storageInfo.namespaceID = in.readInt();
+    storageInfo.cTime = in.readLong();
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Apr  3 14:39:25 2007
@@ -304,6 +304,14 @@
     }
 
     /**
+     * Finalize previously upgraded files system state.
+     * @throws IOException
+     */
+    public void finalizeUpgrade() throws IOException {
+      dfs.finalizeUpgrade();
+    }
+
+    /**
      * We need to find the blocks that didn't match.  Likely only one 
      * is corrupt but we will report both to the namenode.  In the future,
      * we can consider figuring out exactly which block is corrupt.
@@ -400,6 +408,14 @@
     public void refreshNodes() throws IOException {
       ((RawDistributedFileSystem)fs).refreshNodes();
     }
+
+    /**
+     * Finalize previously upgraded files system state.
+     */
+    public void finalizeUpgrade() throws IOException {
+      ((RawDistributedFileSystem)fs).finalizeUpgrade();
+    }
+
     /**
      * We need to find the blocks that didn't match.  Likely only one 
      * is corrupt but we will report both to the namenode.  In the future,

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue Apr  3 14:39:25 2007
@@ -122,11 +122,23 @@
     // SafeMode actions
     public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
 
+    // Startup options
+    public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK; }
+
+    /**
+     * Type of the node
+     */
+    static public enum NodeType {
+      NAME_NODE,
+      DATA_NODE;
+    }
+
     // Version is reflected in the dfs image and edit log files.
     // Version is reflected in the data storage file.
     // Versions are negative.
-    // Decrement DFS_CURRENT_VERSION to define a new version.
-    public static final int DFS_CURRENT_VERSION = -3;
-    // Current version: New operations OP_DATANODE_REMOVE and OP_DATANODE_ADD
-    // are introduced
+    // Decrement LAYOUT_VERSION to define a new version.
+    public static final int LAYOUT_VERSION = -4;
+    // Current version: 
+    // Top level directory is reorganized to allow file system state 
+    // transitions: upgrade, rollback, and finalize.
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Tue Apr  3 14:39:25 2007
@@ -118,7 +118,7 @@
             if ( children == null || children.length == 0 ) {
               children = new FSDir[maxBlocksPerDir];
               for (int idx = 0; idx < maxBlocksPerDir; idx++) {
-                children[idx] = new FSDir( new File(dir, "subdir"+idx) );
+                children[idx] = new FSDir( new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx) );
               }
             }
             
@@ -256,19 +256,19 @@
     class FSVolume {
       static final double USABLE_DISK_PCT_DEFAULT = 0.98f; 
 
-      private File dir;
+      private File dir; // TODO this field is redundant equals this.dataDir.dir.getParent()
       private FSDir dataDir;
       private File tmpDir;
       private DF usage;
       private long reserved;
       private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
     
-      FSVolume(File dir, Configuration conf) throws IOException {
+      FSVolume( File currentDir, Configuration conf) throws IOException {
         this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
         this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
             (float) USABLE_DISK_PCT_DEFAULT);
-        this.dir = dir;
-        this.dataDir = new FSDir(new File(dir, "data"));
+        this.dir = currentDir.getParentFile();
+        this.dataDir = new FSDir( currentDir );
         this.tmpDir = new File(dir, "tmp");
         if (tmpDir.exists()) {
           FileUtil.fullyDelete(tmpDir);
@@ -431,11 +431,11 @@
     /**
      * An FSDataset has a directory where it loads its data files.
      */
-    public FSDataset(File[] dirs, Configuration conf) throws IOException {
+    public FSDataset( DataStorage storage, Configuration conf) throws IOException {
     	this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
-        FSVolume[] volArray = new FSVolume[dirs.length];
-        for (int idx = 0; idx < dirs.length; idx++) {
-          volArray[idx] = new FSVolume(dirs[idx], conf);
+        FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
+        for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+          volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
         }
         volumes = new FSVolumeSet(volArray);
         volumeMap = new HashMap<Block,FSVolume>();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Tue Apr  3 14:39:25 2007
@@ -22,7 +22,6 @@
 import java.io.*;
 import java.util.*;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -315,13 +314,12 @@
     TreeMap activeLocks = new TreeMap();
     FSImage fsImage;  
     boolean ready = false;
-    int namespaceID = 0;    // TODO: move to FSImage class, it belongs there
     // Metrics record
     private MetricsRecord directoryMetrics = null;
     
     /** Access an existing dfs name directory. */
-    public FSDirectory(File[] dirs) throws IOException {
-      this.fsImage = new FSImage( dirs );
+    public FSDirectory() throws IOException {
+      this.fsImage = new FSImage();
       initialize();
     }
 
@@ -335,12 +333,23 @@
       directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
     }
 
-    void loadFSImage( Configuration conf ) throws IOException {
-      fsImage.loadFSImage( conf );
+    void loadFSImage( Collection<File> dataDirs,
+                      StartupOption startOpt ) throws IOException {
+      // format before starting up if requested
+      if( startOpt == StartupOption.FORMAT ) {
+        fsImage.setStorageDirectories( dataDirs );
+        fsImage.format();
+        startOpt = StartupOption.REGULAR;
+      }
+      try {
+        fsImage.recoverTransitionRead( dataDirs, startOpt );
+      } catch( IOException e ) {
+        fsImage.close();
+        throw e;
+      }
       synchronized (this) {
         this.ready = true;
         this.notifyAll();
-        fsImage.getEditLog().create();
       }
     }
 
@@ -353,7 +362,7 @@
      * Shutdown the filestore
      */
     public void close() throws IOException {
-        fsImage.getEditLog().close();
+        fsImage.close();
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue Apr  3 14:39:25 2007
@@ -26,10 +26,8 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Vector;
+import java.util.ArrayList;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
@@ -48,60 +46,86 @@
   private static final byte OP_DATANODE_ADD = 5;
   private static final byte OP_DATANODE_REMOVE = 6;
 
-  private static final String FS_EDIT = "edits";
-  private static final String FS_EDIT_NEW = "edits.new";
+  private ArrayList<EditLogOutputStream> editStreams = null;
+  private FSImage fsimage = null;
   
-  private File[] editFiles = null;
-  private File[] editFilesNew = null;
+  static class EditLogOutputStream extends DataOutputStream {
+    private FileDescriptor fd;
 
-  DataOutputStream[] editStreams = null;
-  FileDescriptor[] editDescriptors = null;
-  private FSImage fsimage = null;
+    EditLogOutputStream( File name ) throws IOException {
+      super( new FileOutputStream( name, true )); // open for append
+      this.fd = ((FileOutputStream)out).getFD();
+    }
 
-  FSEditLog(File[] fsDirs, FSImage image)  throws IOException {
-    fsimage = image;
-    editFiles = new File[fsDirs.length];
-    editFilesNew = new File[fsDirs.length];
-    for (int idx = 0; idx < fsDirs.length; idx++) {
-       editFiles[idx] = new File(fsDirs[idx], FS_EDIT);
-       editFilesNew[idx] = new File(fsDirs[idx], FS_EDIT_NEW);
-     }
-   }
+    void flushAndSync() throws IOException {
+      this.flush();
+      this.fd.sync();
+    }
+
+    void create() throws IOException {
+      writeInt( FSConstants.LAYOUT_VERSION );
+      flushAndSync();
+    }
+  }
 
-  FSEditLog(File imageDir, FSImage image, String edits)  throws IOException {
+  FSEditLog( FSImage image ) {
     fsimage = image;
-    editFiles = new File[1];
-    editFiles[0] = new File(imageDir, edits);
-   }
+  }
+
+  private File getEditFile( int idx ) {
+    return fsimage.getEditFile( idx );
+  }
+
+  private File getEditNewFile( int idx ) {
+    return fsimage.getEditNewFile( idx );
+  }
+  
+  private int getNumStorageDirs() {
+    return fsimage.getNumStorageDirs();
+  }
+  
+  int getNumEditStreams() {
+    return editStreams == null ? 0 : editStreams.size();
+  }
 
   /**
+   * Create empty edit log files.
    * Initialize the output stream for logging.
    * 
    * @throws IOException
    */
-  void create() throws IOException {
-    editStreams = new DataOutputStream[editFiles.length];
-    editDescriptors = new FileDescriptor[editFiles.length];
-    for (int idx = 0; idx < editStreams.length; idx++) {
-      FileOutputStream stream = new FileOutputStream(editFiles[idx]);
-      editStreams[idx] = new DataOutputStream(stream);
-      editDescriptors[idx] = stream.getFD();
-      editStreams[idx].writeInt( FSConstants.DFS_CURRENT_VERSION );
+  void open() throws IOException {
+    int size = getNumStorageDirs();
+    if( editStreams == null )
+      editStreams = new ArrayList<EditLogOutputStream>( size );
+    for (int idx = 0; idx < size; idx++) {
+      File eFile = getEditFile( idx );
+      try {
+        EditLogOutputStream eStream = new EditLogOutputStream( eFile );
+        editStreams.add( eStream );
+      } catch (IOException e) {
+        FSNamesystem.LOG.warn( "Unable to open edit log file " + eFile );
+        processIOError(idx); 
+        idx--; 
+      }
     }
   }
 
+  void createEditLogFile( File name ) throws IOException {
+    EditLogOutputStream eStream = new EditLogOutputStream( name );
+    eStream.create();
+    eStream.flushAndSync();
+    eStream.close();
+  }
+
   /**
    * Create edits.new if non existant.
    */
   void createNewIfMissing() throws IOException {
-    for (int idx = 0; idx < editFilesNew.length; idx++) {
-      if (!editFilesNew[idx].exists()) {
-        FileOutputStream stream = new FileOutputStream(editFilesNew[idx]);
-        DataOutputStream editStr = new DataOutputStream(stream);
-        editStr.writeInt( FSConstants.DFS_CURRENT_VERSION );
-        editStr.flush();
-        editStr.close();
-      } 
+    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+      File newFile = getEditNewFile( idx );
+      if( ! newFile.exists() )
+        createEditLogFile( newFile );
     }
   }
   
@@ -112,16 +136,17 @@
     if (editStreams == null) {
       return;
     }
-    for (int idx = 0; idx < editStreams.length; idx++) {
+    for (int idx = 0; idx < editStreams.size(); idx++) {
+      EditLogOutputStream eStream = editStreams.get( idx );
       try {
-        editStreams[idx].flush();
-        editDescriptors[idx].sync();
-        editStreams[idx].close();
+        eStream.flushAndSync();
+        eStream.close();
       } catch (IOException e) {
         processIOError(idx);
         idx--;
       }
     }
+    editStreams.clear();
   }
 
   /**
@@ -131,42 +156,13 @@
    * server to exit
    */
    void processIOError(int index) throws IOException {
-     if (editStreams == null || editStreams.length == 1) {
+     if (editStreams == null || editStreams.size() == 1) {
        throw new IOException("Checkpoint directories inaccessible.");
      }
-     assert(index < editFiles.length);
-     assert(editFiles.length == editFilesNew.length);
-     assert(editFiles.length == editStreams.length);
-     int newsize = editStreams.length - 1;
-     int oldsize = editStreams.length;
-
-     //
-     // save existing values and allocate space for new ones
-     //
-     File[] editFiles1 = editFiles;
-     File[] editFilesNew1 = editFilesNew;
-     DataOutputStream[] editStreams1 = editStreams;
-     FileDescriptor[] editDescriptors1 = editDescriptors;
-     editFiles = new File[newsize];
-     editFilesNew = new File[newsize];
-     editStreams = new DataOutputStream[newsize];
-     editDescriptors = new FileDescriptor[newsize];
+     assert(index < getNumStorageDirs());
+     assert(getNumStorageDirs() == editStreams.size());
 
-     //
-     // copy values from old into new, skip the one with error.
-     //
-     for (int idx = 0; idx < index; idx++) {
-       editFiles[idx] = editFiles1[idx];
-       editFilesNew[idx] = editFilesNew1[idx];
-       editStreams[idx] = editStreams1[idx];
-       editDescriptors[idx] = editDescriptors1[idx];
-     }
-     for (int idx = index; idx < oldsize - 1; idx++) {
-       editFiles[idx] = editFiles1[idx+1];
-       editFilesNew[idx] = editFilesNew1[idx+1];
-       editStreams[idx] = editStreams1[idx+1];
-       editDescriptors[idx] = editDescriptors1[idx+1];
-     }
+     editStreams.remove( index );
      //
      // Invoke the ioerror routine of the fsimage
      //
@@ -174,41 +170,11 @@
    }
 
   /**
-   * Delete specified editLog
-   */
-  void delete(int idx) throws IOException {
-    if (editStreams != null) {
-      try {
-        editStreams[idx].close();
-      } catch (IOException e) {
-        processIOError(idx);
-      }
-    }
-    if (!editFiles[idx].delete() || !editFilesNew[idx].delete()) {
-      if (editStreams != null) {
-        processIOError(idx);
-      }
-    }
-  }
-
-  /**
-   * check if ANY edits log exists
-   */
-  boolean exists() throws IOException {
-    for (int idx = 0; idx < editFiles.length; idx++) {
-      if (editFiles[idx].exists()) { 
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * check if ANY edits.new log exists
    */
   boolean existsNew() throws IOException {
-    for (int idx = 0; idx < editFilesNew.length; idx++) {
-      if (editFilesNew[idx].exists()) { 
+    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+      if (getEditNewFile( idx ).exists()) { 
         return true;
       }
     }
@@ -216,32 +182,11 @@
   }
 
   /**
-   * check if a particular edits.new log exists
-   */
-  boolean existsNew(int idx) throws IOException {
-    if (editFilesNew[idx].exists()) { 
-      return true;
-    }
-    return false;
-  }
-
-  
-  /**
    * Load an edit log, and apply the changes to the in-memory structure
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  int loadFSEdits(Configuration conf, int index) throws IOException {
-    int numEdits = 0;
-    numEdits = loadFSEdits(conf, editFiles[index]);
-    if (editFilesNew[index].exists()) { 
-      numEdits += loadFSEdits(conf, editFilesNew[index]);
-    }
-    return numEdits;
-  }
-
-  int loadFSEdits( Configuration conf, File edits)
-                                                 throws IOException {
+  int loadFSEdits( File edits ) throws IOException {
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
@@ -267,15 +212,15 @@
           logVersion = 0;
         else
           logVersion = in.readInt();
-        if( logVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
+        if( logVersion < FSConstants.LAYOUT_VERSION ) // future version
           throw new IOException(
               "Unexpected version of the file system log file: "
               + logVersion
               + ". Current version = " 
-              + FSConstants.DFS_CURRENT_VERSION + "." );
+              + FSConstants.LAYOUT_VERSION + "." );
       }
       
-      short replication = (short)conf.getInt("dfs.replication", 3);
+      short replication = fsNamesys.getDefaultReplication();
       try {
         while (true) {
           byte opcode = -1;
@@ -304,7 +249,7 @@
               name = (UTF8) writables[0];
               replication = Short.parseShort(
                   ((UTF8)writables[1]).toString());
-              replication = adjustReplication( replication, conf );
+              replication = adjustReplication( replication );
             }
             // get blocks
             aw = new ArrayWritable(Block.class);
@@ -321,9 +266,7 @@
             UTF8 repl = new UTF8();
             src.readFields(in);
             repl.readFields(in);
-            replication = adjustReplication(
-                            fromLogReplication(repl),
-                            conf);
+            replication = adjustReplication( fromLogReplication(repl) );
             fsDir.unprotectedSetReplication(src.toString(), 
                 replication,
                 null);
@@ -383,17 +326,18 @@
       }
     }
     
-    if( logVersion != FSConstants.DFS_CURRENT_VERSION ) // other version
+    if( logVersion != FSConstants.LAYOUT_VERSION ) // other version
       numEdits++; // save this image asap
     return numEdits;
   }
   
-  static short adjustReplication( short replication, Configuration conf) {
-    short minReplication = (short)conf.getInt("dfs.replication.min", 1);
+  static short adjustReplication( short replication) {
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    short minReplication = fsNamesys.getMinReplication();
     if( replication<minReplication ) {
       replication = minReplication;
     }
-    short maxReplication = (short)conf.getInt("dfs.replication.max", 512);
+    short maxReplication = fsNamesys.getMaxReplication();
     if( replication>maxReplication ) {
       replication = maxReplication;
     }
@@ -404,18 +348,19 @@
    * Write an operation to the edit log
    */
   void logEdit(byte op, Writable w1, Writable w2) {
-    for (int idx = 0; idx < editStreams.length; idx++) {
-      synchronized (editStreams[idx]) {
+    assert this.getNumEditStreams() > 0 : "no editlog streams";
+    for (int idx = 0; idx < editStreams.size(); idx++) {
+      EditLogOutputStream eStream;
+      synchronized ( eStream = editStreams.get( idx ) ) {
         try {
-          editStreams[idx].write(op);
+          eStream.write(op);
           if (w1 != null) {
-            w1.write(editStreams[idx]);
+            w1.write( eStream );
           }
           if (w2 != null) {
-            w2.write(editStreams[idx]);
+            w2.write( eStream );
           }
-          editStreams[idx].flush();
-          editDescriptors[idx].sync();
+          eStream.flushAndSync();
         } catch (IOException ie) {
           try {
             processIOError(idx);         
@@ -500,12 +445,12 @@
    * Return the size of the current EditLog
    */
   long getEditLogSize() throws IOException {
-    assert(editFiles.length == editStreams.length);
+    assert(getNumStorageDirs() == editStreams.size());
     long size = 0;
-    for (int idx = 0; idx < editFiles.length; idx++) {
-      synchronized (editStreams[idx]) {
-        assert(size == 0 || size == editFiles[idx].length());
-        size = editFiles[idx].length();
+    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+      synchronized (editStreams.get( idx )) {
+        assert(size == 0 || size == getEditFile( idx ).length());
+        size = getEditFile( idx ).length();
       }
     }
     return size;
@@ -527,12 +472,11 @@
     //
     // Open edits.new
     //
-    for (int idx = 0; idx < editFiles.length; idx++ ) {
+    for (int idx = 0; idx < getNumStorageDirs(); idx++ ) {
       try {
-        FileOutputStream stream = new FileOutputStream(editFilesNew[idx]);
-        editStreams[idx] = new DataOutputStream(stream);
-        editDescriptors[idx] = stream.getFD();
-        editStreams[idx].writeInt( FSConstants.DFS_CURRENT_VERSION );
+        EditLogOutputStream eStream = new EditLogOutputStream( getEditNewFile( idx ));
+        eStream.create();
+        editStreams.add( eStream );
       } catch (IOException e) {
         processIOError(idx);
         idx--;
@@ -541,41 +485,10 @@
   }
 
   /**
-   * Closes the current edit log and opens edits.new. 
-   * If edits.new already exists, then ignore it.
-   */
-  void rollEditLogIfNeeded() throws IOException {
-
-    //
-    // Open edits.new
-    //
-    for (int idx = 0; idx < editFiles.length; idx++ ) {
-      if (existsNew(idx)) {
-        continue;
-      }
-      try {
-        FileOutputStream stream = new FileOutputStream(editFilesNew[idx]);
-        editStreams[idx] = new DataOutputStream(stream);
-        editDescriptors[idx] = stream.getFD();
-        editStreams[idx].writeInt( FSConstants.DFS_CURRENT_VERSION );
-      } catch (IOException e) {
-        processIOError(idx);
-        idx--;
-      }
-    }
-  }
-  /**
    * Removes the old edit log and renamed edits.new as edits.
    * Reopens the edits file.
    */
   void purgeEditLog() throws IOException {
-    purgeEditLog(true);
-  }
-
-  /**
-   * Removes the old edit log and renamed edits.new as edits.
-   */
-  void purgeEditLog(boolean reopenEdits) throws IOException {
     //
     // If edits.new does not exists, then return error.
     //
@@ -588,14 +501,14 @@
     //
     // Delete edits and rename edits.new to edits.
     //
-    for (int idx = 0; idx < editFiles.length; idx++ ) {
-      if (!editFilesNew[idx].renameTo(editFiles[idx])) {
+    for (int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+      if (!getEditNewFile( idx ).renameTo(getEditFile( idx ))) {
         //
         // renameTo() fails on Windows if the destination
         // file exists.
         //
-        editFiles[idx].delete();
-        if (!editFilesNew[idx].renameTo(editFiles[idx])) {
+        getEditFile( idx ).delete();
+        if (!getEditNewFile( idx ).renameTo(getEditFile( idx ))) {
           processIOError(idx); 
           idx--; 
         }
@@ -604,24 +517,13 @@
     //
     // Reopen all the edits logs.
     //
-    boolean append = true;
-    for (int idx = 0; reopenEdits && idx < editStreams.length; idx++) {
-      try {
-        FileOutputStream stream = new FileOutputStream(editFiles[idx],
-                                                       append);
-        editStreams[idx] = new DataOutputStream(stream);
-        editDescriptors[idx] = stream.getFD();
-      } catch (IOException e) {
-        processIOError(idx); 
-        idx--; 
-      }
-    }
+    open();
   }
 
   /**
    * Return the name of the edit file
    */
   File getFsEditName() throws IOException {
-      return editFiles[0];
+      return getEditFile( 0 );
   }
 }