You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/07 03:11:17 UTC

svn commit: r1132846 [2/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/c++/libhdfs/m4/ src/c++/libhdfs/tests/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/bin/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apac...

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Jun  7 01:11:15 2011
@@ -39,10 +39,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -107,11 +110,11 @@ public class DataStorage extends Storage
     this.storageID = newStorageID;
   }
   
-  synchronized void createStorageID() {
+  synchronized void createStorageID(int datanodePort) {
     if (storageID != null && !storageID.isEmpty()) {
       return;
     }
-    storageID = DataNode.createNewStorageId();
+    storageID = DataNode.createNewStorageId(datanodePort);
   }
   
   /**
@@ -128,10 +131,9 @@ public class DataStorage extends Storage
    * @param startOpt startup option
    * @throws IOException
    */
-  synchronized void recoverTransitionRead(NamespaceInfo nsInfo,
-                             Collection<File> dataDirs,
-                             StartupOption startOpt
-                             ) throws IOException {
+  synchronized void recoverTransitionRead(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
+      throws IOException {
     if (initilized) {
       // DN storage has been initialized, no need to do anything
       return;
@@ -190,13 +192,13 @@ public class DataStorage extends Storage
     // During startup some of them can upgrade or rollback 
     // while others could be uptodate for the regular startup.
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(getStorageDir(idx), nsInfo, startOpt);
+      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
       assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
         "Data-node and name-node layout versions must be the same.";
     }
     
     // make sure we have storage id set - if not - generate new one
-    createStorageID();
+    createStorageID(datanode.getPort());
     
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
@@ -208,16 +210,17 @@ public class DataStorage extends Storage
   /**
    * recoverTransitionRead for a specific block pool
    * 
+   * @param datanode DataNode
    * @param bpID Block pool Id
    * @param nsInfo Namespace info of namenode corresponding to the block pool
    * @param dataDirs Storage directories
    * @param startOpt startup option
    * @throws IOException on error
    */
-  void recoverTransitionRead(String bpID, NamespaceInfo nsInfo,
+  void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt) throws IOException {
     // First ensure datanode level format/snapshot/rollback is completed
-    recoverTransitionRead(nsInfo, dataDirs, startOpt);
+    recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
     
     // Create list of storage directories for the block pool
     Collection<File> bpDataDirs = new ArrayList<File>();
@@ -232,7 +235,7 @@ public class DataStorage extends Storage
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
         nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
     
-    bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
+    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
     addBlockPoolStorage(bpID, bpStorage);
   }
 
@@ -287,8 +290,8 @@ public class DataStorage extends Storage
     props.setProperty("cTime", String.valueOf(cTime));
     props.setProperty("layoutVersion", String.valueOf(layoutVersion));
     props.setProperty("storageID", getStorageID());
-    // Set NamespaceID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
-    if (layoutVersion >= LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+    // Set NamespaceID in version before federation
+    if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       props.setProperty("namespaceID", String.valueOf(namespaceID));
     }
   }
@@ -305,8 +308,8 @@ public class DataStorage extends Storage
     setStorageType(props, sd);
     setClusterId(props, layoutVersion, sd);
     
-    // Read NamespaceID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
-    if (layoutVersion >= LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+    // Read NamespaceID in version before federation
+    if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       setNamespaceID(props, sd);
     }
     
@@ -356,12 +359,14 @@ public class DataStorage extends Storage
    * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
    * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
    * 
+   * @param datanode Datanode to which this storage belongs to
    * @param sd  storage directory
    * @param nsInfo  namespace info
    * @param startOpt  startup option
    * @throws IOException
    */
-  private void doTransition( StorageDirectory sd, 
+  private void doTransition( DataNode datanode,
+                             StorageDirectory sd, 
                              NamespaceInfo nsInfo, 
                              StartupOption startOpt
                              ) throws IOException {
@@ -373,8 +378,10 @@ public class DataStorage extends Storage
     assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
       "Future version is not allowed";
     
+    boolean federationSupported = 
+      LayoutVersion.supports(Feature.FEDERATION, layoutVersion);
     // For pre-federation version - validate the namespaceID
-    if (layoutVersion >= Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION &&
+    if (!federationSupported &&
         getNamespaceID() != nsInfo.getNamespaceID()) {
       throw new IOException("Incompatible namespaceIDs in "
           + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = "
@@ -382,8 +389,8 @@ public class DataStorage extends Storage
           + getNamespaceID());
     }
     
-    // For post federation version, validate clusterID
-    if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION
+    // For version that supports federation, validate clusterID
+    if (federationSupported
         && !getClusterID().equals(nsInfo.getClusterID())) {
       throw new IOException("Incompatible clusterIDs in "
           + sd.getRoot().getCanonicalPath() + ": namenode clusterID = "
@@ -395,7 +402,10 @@ public class DataStorage extends Storage
         && this.cTime == nsInfo.getCTime())
       return; // regular startup
     // verify necessity of a distributed upgrade
-    verifyDistributedUpgradeProgress(nsInfo);
+    UpgradeManagerDatanode um = 
+      datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+    verifyDistributedUpgradeProgress(um, nsInfo);
+    
     // do upgrade
     if (this.layoutVersion > FSConstants.LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
@@ -435,7 +445,7 @@ public class DataStorage extends Storage
    * @throws IOException on error
    */
   void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
-    if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       clusterID = nsInfo.getClusterID();
       layoutVersion = nsInfo.getLayoutVersion();
       sd.write();
@@ -493,10 +503,10 @@ public class DataStorage extends Storage
    * @throws IOException if the directory is not empty or it can not be removed
    */
   private void cleanupDetachDir(File detachDir) throws IOException {
-    if (layoutVersion >= PRE_RBW_LAYOUT_VERSION &&
+    if (!LayoutVersion.supports(Feature.APPEND_RBW_DIR, layoutVersion) &&
         detachDir.exists() && detachDir.isDirectory() ) {
       
-        if (detachDir.list().length != 0 ) {
+        if (FileUtil.list(detachDir).length != 0 ) {
           throw new IOException("Detached directory " + detachDir +
               " is not empty. Please manually move each file under this " +
               "directory to the finalized directory if the finalized " +
@@ -626,7 +636,7 @@ public class DataStorage extends Storage
     HardLink hardLink = new HardLink();
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
-    if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
+    if (LayoutVersion.supports(Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
       // hardlink finalized blocks in tmpDir/finalized
       linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
           new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
@@ -725,11 +735,9 @@ public class DataStorage extends Storage
     }
   }
 
-  private void verifyDistributedUpgradeProgress(
+  private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
                   NamespaceInfo nsInfo
                 ) throws IOException {
-    UpgradeManagerDatanode um = 
-      DataNode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
     assert um != null : "DataNode.upgradeManager is null.";
     um.setUpgradeState(false, getLayoutVersion());
     um.initializeUpgrade(nsInfo);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Tue Jun  7 01:11:15 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -38,11 +39,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Periodically scans the data directories for block and block metadata files.
@@ -54,6 +57,7 @@ public class DirectoryScanner implements
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
   private static final int DEFAULT_SCAN_INTERVAL = 21600;
 
+  private final DataNode datanode;
   private final FSDataset dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
@@ -218,7 +222,8 @@ public class DirectoryScanner implements
     }
   }
 
-  DirectoryScanner(FSDataset dataset, Configuration conf) {
+  DirectoryScanner(DataNode dn, FSDataset dataset, Configuration conf) {
+    this.datanode = dn;
     this.dataset = dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DEFAULT_SCAN_INTERVAL);
@@ -270,7 +275,7 @@ public class DirectoryScanner implements
       String[] bpids = dataset.getBPIdlist();
       for(String bpid : bpids) {
         UpgradeManagerDatanode um = 
-          DataNode.getUpgradeManagerDatanode(bpid);
+          datanode.getUpgradeManagerDatanode(bpid);
         if (um != null && !um.isUpgradeCompleted()) {
           //If distributed upgrades underway, exit and wait for next cycle.
           LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
@@ -480,9 +485,15 @@ public class DirectoryScanner implements
     /** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
     private LinkedList<ScanInfo> compileReport(FSVolume vol, File dir,
         LinkedList<ScanInfo> report) {
-      File[] files = dir.listFiles();
+      File[] files;
+      try {
+        files = FileUtil.listFiles(dir);
+      } catch (IOException ioe) {
+        LOG.warn("Exception occured while compiling report: ", ioe);
+        // Ignore this directory and proceed.
+        return report;
+      }
       Arrays.sort(files);
-
       /*
        * Assumption: In the sorted list of files block file appears immediately
        * before block metadata file. This is true for the current naming

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Jun  7 01:11:15 2011
@@ -99,7 +99,7 @@ public class FSDataset implements FSCons
                                 dir.toString());
         }
       } else {
-        File[] files = dir.listFiles();
+        File[] files = FileUtil.listFiles(dir); 
         int numChildren = 0;
         for (int idx = 0; idx < files.length; idx++) {
           if (files[idx].isDirectory()) {
@@ -187,7 +187,7 @@ public class FSDataset implements FSCons
      * original file name; otherwise the tmp file is deleted.
      */
     private void recoverTempUnlinkedBlock() throws IOException {
-      File files[] = dir.listFiles();
+      File files[] = FileUtil.listFiles(dir);
       for (File file : files) {
         if (!FSDataset.isUnlinkTmpFile(file)) {
           continue;
@@ -420,9 +420,9 @@ public class FSDataset implements FSCons
      * @param isFinalized true if the directory has finalized replicas;
      *                    false if the directory has rbw replicas
      */
-    private void addToReplicasMap(ReplicasMap volumeMap, 
-        File dir, boolean isFinalized) {
-      File blockFiles[] = dir.listFiles();
+    private void addToReplicasMap(ReplicasMap volumeMap, File dir,
+        boolean isFinalized) throws IOException {
+      File blockFiles[] = FileUtil.listFiles(dir);
       for (File blockFile : blockFiles) {
         if (!Block.isBlockFilename(blockFile))
           continue;
@@ -724,10 +724,10 @@ public class FSDataset implements FSCons
       File finalizedDir = new File(bpCurrentDir,
           DataStorage.STORAGE_DIR_FINALIZED);
       File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
-      if (finalizedDir.exists() && finalizedDir.list().length != 0) {
+      if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
         return false;
       }
-      if (rbwDir.exists() && rbwDir.list().length != 0) {
+      if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
         return false;
       }
       return true;
@@ -756,7 +756,7 @@ public class FSDataset implements FSCons
           throw new IOException("Failed to delete " + finalizedDir);
         }
         FileUtil.fullyDelete(tmpDir);
-        for (File f : bpCurrentDir.listFiles()) {
+        for (File f : FileUtil.listFiles(bpCurrentDir)) {
           if (!f.delete()) {
             throw new IOException("Failed to delete " + f);
           }
@@ -764,7 +764,7 @@ public class FSDataset implements FSCons
         if (!bpCurrentDir.delete()) {
           throw new IOException("Failed to delete " + bpCurrentDir);
         }
-        for (File f : bpDir.listFiles()) {
+        for (File f : FileUtil.listFiles(bpDir)) {
           if (!f.delete()) {
             throw new IOException("Failed to delete " + f);
           }
@@ -1118,6 +1118,7 @@ public class FSDataset implements FSCons
     return f;
   }
     
+  private final DataNode datanode;
   final FSVolumeSet volumes;
   private final int maxBlocksPerDir;
   final ReplicasMap volumeMap;
@@ -1133,7 +1134,9 @@ public class FSDataset implements FSCons
   /**
    * An FSDataset has a directory where it loads its data files.
    */
-  public FSDataset(DataStorage storage, Configuration conf) throws IOException {
+  public FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
+      throws IOException {
+    this.datanode = datanode;
     this.maxBlocksPerDir = 
       conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
                   DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
@@ -2000,7 +2003,6 @@ public class FSDataset implements FSCons
         return f;
    
       // if file is not null, but doesn't exist - possibly disk failed
-      DataNode datanode = DataNode.getDataNode();
       datanode.checkDiskError();
     }
     
@@ -2246,7 +2248,6 @@ public class FSDataset implements FSCons
    */
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
-    DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     synchronized (this) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java Tue Jun  7 01:11:15 2011
@@ -125,7 +125,7 @@ public abstract class UpgradeObjectDatan
     // Complete the upgrade by calling the manager method
     try {
       UpgradeManagerDatanode upgradeManager = 
-        DataNode.getUpgradeManagerDatanode(bpid);
+        dataNode.getUpgradeManagerDatanode(bpid);
       if(upgradeManager != null)
         upgradeManager.completeUpgrade();
     } catch(IOException e) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Tue Jun  7 01:11:15 2011
@@ -27,6 +27,8 @@ import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
@@ -245,7 +247,7 @@ public class BackupImage extends FSImage
           BufferedInputStream bin = new BufferedInputStream(backupInputStream);
           DataInputStream in = new DataInputStream(bin);
           Checksum checksum = null;
-          if (logVersion <= -28) { // support fsedits checksum
+          if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
             checksum = FSEditLog.getChecksum();
             in = new DataInputStream(new CheckedInputStream(bin, checksum));
           }
@@ -356,7 +358,7 @@ public class BackupImage extends FSImage
       FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
       int logVersion = logLoader.readLogVersion(in);
       Checksum checksum = null;
-      if (logVersion <= -28) { // support fsedits checksum
+      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
         checksum = FSEditLog.getChecksum();
         in = new DataInputStream(new CheckedInputStream(bin, checksum));
       }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Tue Jun  7 01:11:15 2011
@@ -369,13 +369,12 @@ class ClusterJspHelper {
       nn.filesAndDirectories = mxbeanProxy.getTotalFiles();
       nn.capacity = mxbeanProxy.getTotal();
       nn.free = mxbeanProxy.getFree();
-      nn.dfsUsed = mxbeanProxy.getUsed();
+      nn.bpUsed = mxbeanProxy.getBlockPoolUsedSpace();
       nn.nonDfsUsed = mxbeanProxy.getNonDfsUsedSpace();
       nn.blocksCount = mxbeanProxy.getTotalBlocks();
       nn.missingBlocksCount = mxbeanProxy.getNumberOfMissingBlocks();
-      nn.capacity = mxbeanProxy.getTotal();
       nn.free = mxbeanProxy.getFree();
-      nn.httpAddress = DFSUtil.getInfoServer(rpcAddress, conf);
+      nn.httpAddress = DFSUtil.getInfoServer(rpcAddress, conf, false);
       getLiveNodeCount(mxbeanProxy.getLiveNodes(), nn);
       getDeadNodeCount(mxbeanProxy.getDeadNodes(), nn);
       return nn;
@@ -535,9 +534,9 @@ class ClusterJspHelper {
     String clusterid = "";
     long total_sum = 0;
     long free_sum = 0;
-    long used = 0;
+    long clusterDfsUsed = 0;
     long nonDfsUsed_sum = 0;
-    long totalFilesAndBlocks = 0;
+    long totalFilesAndDirectories = 0;
     
     /** List of namenodes in the cluster */
     final List<NamenodeStatus> nnList = new ArrayList<NamenodeStatus>();
@@ -553,10 +552,10 @@ class ClusterJspHelper {
       nnList.add(nn);
       
       // Add namenode status to cluster status
-      totalFilesAndBlocks += nn.filesAndDirectories;
+      totalFilesAndDirectories += nn.filesAndDirectories;
       total_sum += nn.capacity;
       free_sum += nn.free;
-      used += nn.dfsUsed;
+      clusterDfsUsed += nn.bpUsed;
       nonDfsUsed_sum += nn.nonDfsUsed;
     }
 
@@ -580,7 +579,7 @@ class ClusterJspHelper {
         total = total_sum / size;
         free = free_sum / size;
         nonDfsUsed = nonDfsUsed_sum / size;
-        dfsUsedPercent = DFSUtil.getPercentUsed(used, total_sum);
+        dfsUsedPercent = DFSUtil.getPercentUsed(clusterDfsUsed, total);
         dfsRemainingPercent = DFSUtil.getPercentRemaining(free, total);
       }
     
@@ -589,23 +588,23 @@ class ClusterJspHelper {
     
       doc.startTag("storage");
     
-      toXmlItemBlock(doc, "Total Files And Blocks",
-          Long.toString(totalFilesAndBlocks));
+      toXmlItemBlock(doc, "Total Files And Directories",
+          Long.toString(totalFilesAndDirectories));
     
       toXmlItemBlock(doc, "Configured Capacity", StringUtils.byteDesc(total));
     
-      toXmlItemBlock(doc, "Used", StringUtils.byteDesc(used));
+      toXmlItemBlock(doc, "DFS Used", StringUtils.byteDesc(clusterDfsUsed));
     
       toXmlItemBlock(doc, "Non DFS Used", StringUtils.byteDesc(nonDfsUsed));
     
-      toXmlItemBlock(doc, "Remaining", StringUtils.byteDesc(free));
+      toXmlItemBlock(doc, "DFS Remaining", StringUtils.byteDesc(free));
     
       // dfsUsedPercent
-      toXmlItemBlock(doc, "Used%", StringUtils.limitDecimalTo2(dfsUsedPercent)
-          + "%");
+      toXmlItemBlock(doc, "DFS Used%", 
+          StringUtils.limitDecimalTo2(dfsUsedPercent)+ "%");
     
       // dfsRemainingPercent
-      toXmlItemBlock(doc, "Remaining%",
+      toXmlItemBlock(doc, "DFS Remaining%",
           StringUtils.limitDecimalTo2(dfsRemainingPercent) + "%");
     
       doc.endTag(); // storage
@@ -617,8 +616,8 @@ class ClusterJspHelper {
       for (NamenodeStatus nn : nnList) {
         doc.startTag("node");
         toXmlItemBlockWithLink(doc, nn.host, nn.httpAddress, "NameNode");
-        toXmlItemBlock(doc, "Used",
-            StringUtils.byteDesc(nn.dfsUsed));
+        toXmlItemBlock(doc, "Blockpool Used",
+            StringUtils.byteDesc(nn.bpUsed));
         toXmlItemBlock(doc, "Files And Directories",
             Long.toString(nn.filesAndDirectories));
         toXmlItemBlock(doc, "Blocks", Long.toString(nn.blocksCount));
@@ -648,7 +647,7 @@ class ClusterJspHelper {
     String host = "";
     long capacity = 0L;
     long free = 0L;
-    long dfsUsed = 0L;
+    long bpUsed = 0L;
     long nonDfsUsed = 0L;
     long filesAndDirectories = 0L;
     long blocksCount = 0L;

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jun  7 01:11:15 2011
@@ -701,6 +701,12 @@ public class FSEditLog  {
   void logUpdateMasterKey(DelegationKey key) {
     logEdit(OP_UPDATE_MASTER_KEY, key);
   }
+
+  void logReassignLease(String leaseHolder, String src, String newHolder) {
+    logEdit(OP_REASSIGN_LEASE, new DeprecatedUTF8(leaseHolder),
+        new DeprecatedUTF8(src),
+        new DeprecatedUTF8(newHolder));
+  }
   
   static private DeprecatedUTF8 toLogReplication(short replication) {
     return new DeprecatedUTF8(Short.toString(replication));

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Jun  7 01:11:15 2011
@@ -37,10 +37,13 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -114,7 +117,7 @@ public class FSEditLogLoader {
     try {
       logVersion = readLogVersion(in);
       Checksum checksum = null;
-      if (logVersion <= -28) { // support fsedits checksum
+      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
         checksum = FSEditLog.getChecksum();
         in = new DataInputStream(new CheckedInputStream(bin, checksum));
       }
@@ -145,7 +148,7 @@ public class FSEditLogLoader {
         numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, 
         numOpSymlink = 0, numOpGetDelegationToken = 0,
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
-        numOpUpdateMasterKey = 0, numOpOther = 0;
+        numOpUpdateMasterKey = 0, numOpReassignLease = 0, numOpOther = 0;
 
     // Keep track of the file offsets of the last several opcodes.
     // This is handy when manually recovering corrupted edits files.
@@ -208,7 +211,7 @@ public class FSEditLogLoader {
             path = FSImageSerialization.readString(in);
             short replication = fsNamesys.adjustReplication(readShort(in));
             mtime = readLong(in);
-            if (logVersion <= -17) {
+            if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
               atime = readLong(in);
             }
             if (logVersion < -7) {
@@ -294,10 +297,6 @@ public class FSEditLogLoader {
             break;
           } 
           case OP_CONCAT_DELETE: {
-            if (logVersion > -22) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             numOpConcatDelete++;
             int length = in.readInt();
             if (length < 3) { // trg, srcs.., timestam
@@ -356,7 +355,7 @@ public class FSEditLogLoader {
             // The disk format stores atimes for directories as well.
             // However, currently this is not being updated/used because of
             // performance reasons.
-            if (logVersion <= -17) {
+            if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
               atime = readLong(in);
             }
   
@@ -387,9 +386,6 @@ public class FSEditLogLoader {
           }
           case OP_SET_PERMISSIONS: {
             numOpSetPerm++;
-            if (logVersion > -11)
-              throw new IOException("Unexpected opCode " + opCode
-                                    + " for version " + logVersion);
             fsDir.unprotectedSetPermission(
                 FSImageSerialization.readString(in), FsPermission.read(in));
             break;
@@ -405,20 +401,12 @@ public class FSEditLogLoader {
             break;
           }
           case OP_SET_NS_QUOTA: {
-            if (logVersion > -16) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
                                       readLongWritable(in), 
                                       FSConstants.QUOTA_DONT_SET);
             break;
           }
           case OP_CLEAR_NS_QUOTA: {
-            if (logVersion > -16) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
                                       FSConstants.QUOTA_RESET,
                                       FSConstants.QUOTA_DONT_SET);
@@ -461,10 +449,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_RENAME: {
-            if (logVersion > -21) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             numOpRename++;
             int length = in.readInt();
             if (length != 3) {
@@ -481,10 +465,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_GET_DELEGATION_TOKEN: {
-            if (logVersion > -24) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             numOpGetDelegationToken++;
             DelegationTokenIdentifier delegationTokenId = 
                 new DelegationTokenIdentifier();
@@ -495,10 +475,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_RENEW_DELEGATION_TOKEN: {
-            if (logVersion > -24) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             numOpRenewDelegationToken++;
             DelegationTokenIdentifier delegationTokenId = 
                 new DelegationTokenIdentifier();
@@ -509,10 +485,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_CANCEL_DELEGATION_TOKEN: {
-            if (logVersion > -24) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             numOpCancelDelegationToken++;
             DelegationTokenIdentifier delegationTokenId = 
                 new DelegationTokenIdentifier();
@@ -522,10 +494,6 @@ public class FSEditLogLoader {
             break;
           }
           case OP_UPDATE_MASTER_KEY: {
-            if (logVersion > -24) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             numOpUpdateMasterKey++;
             DelegationKey delegationKey = new DelegationKey();
             delegationKey.readFields(in);
@@ -533,12 +501,19 @@ public class FSEditLogLoader {
                 delegationKey);
             break;
           }
+          case OP_REASSIGN_LEASE: {
+            numOpReassignLease++;
+            String leaseHolder = FSImageSerialization.readString(in);
+            path = FSImageSerialization.readString(in);
+            String newHolder = FSImageSerialization.readString(in);
+            Lease lease = fsNamesys.leaseManager.getLease(leaseHolder);
+            INodeFileUnderConstruction pendingFile =
+                (INodeFileUnderConstruction) fsDir.getFileINode(path);
+            fsNamesys.reassignLeaseInternal(lease, path, newHolder, pendingFile);
+            break;
+          }
           case OP_START_LOG_SEGMENT:
           case OP_END_LOG_SEGMENT: {
-            if (logVersion > FSConstants.FIRST_TXNID_BASED_LAYOUT_VERSION) {
-              throw new IOException("Unexpected opCode " + opCode
-                  + " for version " + logVersion);
-            }
             // no data in here currently.
             break;
           }
@@ -587,6 +562,7 @@ public class FSEditLogLoader {
           + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
           + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
           + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
+          + " numOpReassignLease = " + numOpReassignLease
           + " numOpOther = " + numOpOther);
     }
     return numEdits;

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Tue Jun  7 01:11:15 2011
@@ -53,9 +53,10 @@ public enum FSEditLogOpCodes {
   OP_RENEW_DELEGATION_TOKEN     ((byte) 19),
   OP_CANCEL_DELEGATION_TOKEN    ((byte) 20),
   OP_UPDATE_MASTER_KEY          ((byte) 21),
-  OP_END_LOG_SEGMENT            ((byte) 22),
-  OP_START_LOG_SEGMENT          ((byte) 23),
-  
+  OP_REASSIGN_LEASE             ((byte) 22),
+  OP_END_LOG_SEGMENT            ((byte) 23),
+  OP_START_LOG_SEGMENT          ((byte) 24),
+
   // must be same as NamenodeProtocol.JA_JSPOOL_START
   OP_JSPOOL_START               ((byte)102);
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Jun  7 01:11:15 2011
@@ -39,6 +39,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -225,12 +227,13 @@ public class FSImage implements Closeabl
     }
 
 
-    if (storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
+    int layoutVersion = storage.getLayoutVersion();
+    if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
       NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
     }
     if (startOpt != StartupOption.UPGRADE
-        && storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
-        && storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
+        && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
+        && layoutVersion != FSConstants.LAYOUT_VERSION) {
       throw new IOException(
           "\nFile system image contains an old layout version " 
           + storage.getLayoutVersion() + ".\nAn upgrade to version "
@@ -239,12 +242,12 @@ public class FSImage implements Closeabl
     }
     
     // Upgrade to federation requires -upgrade -clusterid <clusterID> option
-    if (startOpt == StartupOption.UPGRADE
-        && storage.getLayoutVersion() > Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+    if (startOpt == StartupOption.UPGRADE && 
+        !LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       if (startOpt.getClusterId() == null) {
         throw new IOException(
             "\nFile system image contains an old layout version "
-                + storage.getLayoutVersion() + ".\nAn upgrade to version "
+                + layoutVersion + ".\nAn upgrade to version "
                 + FSConstants.LAYOUT_VERSION
                 + " is required.\nPlease restart NameNode with "
                 + "-upgrade -clusterid <clusterID> option.");
@@ -448,6 +451,14 @@ public class FSImage implements Closeabl
 
       // read and verify consistency of the prev dir
       sdPrev.read(sdPrev.getPreviousVersionFile());
+      if (prevState.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
+        throw new IOException(
+          "Cannot rollback to storage version " +
+          prevState.getLayoutVersion() +
+          " using this version of the NameNode, which uses storage version " +
+          FSConstants.LAYOUT_VERSION + ". " +
+          "Please use the previous version of HDFS to perform the rollback.");
+      }
       canRollback = true;
     }
     if (!canRollback)

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Jun  7 01:11:15 2011
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.io.MD5Hash;
@@ -139,7 +141,7 @@ class FSImageFormat {
          * it should not contain version and namespace fields
          */
         // read image version: first appeared in version -1
-        long imgVersion = in.readInt();
+        int imgVersion = in.readInt();
         if(getLayoutVersion() != imgVersion)
           throw new InconsistentFSStateException(curFile, 
               "imgVersion " + imgVersion +
@@ -168,7 +170,7 @@ class FSImageFormat {
 
         // read compression related info
         FSImageCompression compression;
-        if (imgVersion <= -25) {  // -25: 1st version providing compression option
+        if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imgVersion)) {
           compression = FSImageCompression.readCompressionHeader(conf, in);
         } else {
           compression = FSImageCompression.createNoopCompression();
@@ -179,7 +181,8 @@ class FSImageFormat {
 
         // load all inodes
         LOG.info("Number of files = " + numFiles);
-        if (imgVersion <= -30) {
+        if (LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
+            imgVersion)) {
           loadLocalNameINodes(numFiles, in);
         } else {
           loadFullNameINodes(numFiles, in);
@@ -228,6 +231,8 @@ class FSImageFormat {
    */  
    private void loadLocalNameINodes(long numFiles, DataInputStream in) 
    throws IOException {
+     assert LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
+         getLayoutVersion());
      assert numFiles > 0;
 
      // load root
@@ -322,11 +327,11 @@ class FSImageFormat {
     long atime = 0;
     long blockSize = 0;
     
-    long imgVersion = getLayoutVersion();
+    int imgVersion = getLayoutVersion();
     short replication = in.readShort();
     replication = namesystem.adjustReplication(replication);
     modificationTime = in.readLong();
-    if (imgVersion <= -17) {
+    if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imgVersion)) {
       atime = in.readLong();
     }
     if (imgVersion <= -8) {
@@ -365,17 +370,19 @@ class FSImageFormat {
     
     // get quota only when the node is a directory
     long nsQuota = -1L;
-    if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
+      if (LayoutVersion.supports(Feature.NAMESPACE_QUOTA, imgVersion)
+          && blocks == null && numBlocks == -1) {
         nsQuota = in.readLong();
       }
       long dsQuota = -1L;
-      if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
+      if (LayoutVersion.supports(Feature.DISKSPACE_QUOTA, imgVersion)
+          && blocks == null && numBlocks == -1) {
         dsQuota = in.readLong();
       }
   
       // Read the symlink only when the node is a symlink
       String symlink = "";
-      if (imgVersion <= -23 && numBlocks == -2) {
+      if (numBlocks == -2) {
         symlink = Text.readString(in);
       }
       
@@ -390,7 +397,7 @@ class FSImageFormat {
 
     private void loadDatanodes(DataInputStream in)
         throws IOException {
-      long imgVersion = getLayoutVersion();
+      int imgVersion = getLayoutVersion();
 
       if (imgVersion > -3) // pre datanode image version
         return;
@@ -407,7 +414,7 @@ class FSImageFormat {
     private void loadFilesUnderConstruction(DataInputStream in)
     throws IOException {
       FSDirectory fsDir = namesystem.dir;
-      long imgVersion = getLayoutVersion();
+      int imgVersion = getLayoutVersion();
       if (imgVersion > -13) // pre lease image version
         return;
       int size = in.readInt();
@@ -435,9 +442,9 @@ class FSImageFormat {
 
     private void loadSecretManagerState(DataInputStream in)
         throws IOException {
-      long imgVersion = getLayoutVersion();
+      int imgVersion = getLayoutVersion();
 
-      if (imgVersion > -23) {
+      if (!LayoutVersion.supports(Feature.DELEGATION_TOKEN, imgVersion)) {
         //SecretManagerState is not available.
         //This must not happen if security is turned on.
         return; 
@@ -445,15 +452,15 @@ class FSImageFormat {
       namesystem.loadSecretManagerState(in);
     }
 
-    private long getLayoutVersion() {
+    private int getLayoutVersion() {
       return namesystem.getFSImage().getStorage().getLayoutVersion();
     }
 
     private long readNumFiles(DataInputStream in)
         throws IOException {
-      long imgVersion = getLayoutVersion();
+      int imgVersion = getLayoutVersion();
 
-      if (imgVersion <= -16) {
+      if (LayoutVersion.supports(Feature.NAMESPACE_QUOTA, imgVersion)) {
         return in.readLong();
       } else {
         return in.readInt();

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jun  7 01:11:15 2011
@@ -2288,6 +2288,8 @@ public class FSNamesystem implements FSC
       String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
       IOException, UnresolvedLinkException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
+    
+    assert !isInSafeMode();
 
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
@@ -2409,9 +2411,15 @@ public class FSNamesystem implements FSC
   }
 
   Lease reassignLease(Lease lease, String src, String newHolder,
-                      INodeFileUnderConstruction pendingFile) {
+      INodeFileUnderConstruction pendingFile) throws IOException {
     if(newHolder == null)
       return lease;
+    logReassignLease(lease.getHolder(), src, newHolder);
+    return reassignLeaseInternal(lease, src, newHolder, pendingFile);
+  }
+  
+  Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
+      INodeFileUnderConstruction pendingFile) throws IOException {
     pendingFile.setClientName(newHolder);
     return leaseManager.reassignLease(lease, src, newHolder);
   }
@@ -5371,6 +5379,17 @@ public class FSNamesystem implements FSC
     getEditLog().logSync();
   }
   
+  private void logReassignLease(String leaseHolder, String src,
+      String newHolder) throws IOException {
+    writeLock();
+    try {
+      getEditLog().logReassignLease(leaseHolder, src, newHolder);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+  }
+  
   /**
    * 
    * @return true if delegation token operation is allowed

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Jun  7 01:11:15 2011
@@ -89,8 +89,10 @@ public class FileChecksumServlets {
       final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
       xml.declaration();
 
+      final ServletContext context = getServletContext();
+      final DataNode datanode = (DataNode) context.getAttribute("datanode");
       final Configuration conf = 
-        new HdfsConfiguration(DataNode.getDataNode().getConf());
+        new HdfsConfiguration(datanode.getConf());
       final int socketTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsConstants.READ_TIMEOUT);
@@ -99,7 +101,7 @@ public class FileChecksumServlets {
       
       try {
         final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
-            DataNode.getDataNode(), conf, getUGI(request, conf));
+            datanode, conf, getUGI(request, conf));
         final ClientProtocol nnproxy = dfs.getNamenode();
         final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
             filename, nnproxy, socketFactory, socketTimeout);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Tue Jun  7 01:11:15 2011
@@ -33,6 +33,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 /**
@@ -45,8 +47,8 @@ import static org.apache.hadoop.hdfs.ser
  * 2.1) Get the datanodes which contains b
  * 2.2) Assign one of the datanodes as the primary datanode p
 
- * 2.3) p obtains a new generation stamp form the namenode
- * 2.4) p get the block info from each datanode
+ * 2.3) p obtains a new generation stamp from the namenode
+ * 2.4) p gets the block info from each datanode
  * 2.5) p computes the minimum block length
  * 2.6) p updates the datanodes, which have a valid generation stamp,
  *      with the new generation stamp and the minimum block length 
@@ -377,7 +379,7 @@ public class LeaseManager {
 
 
         try {
-          Thread.sleep(2000);
+          Thread.sleep(HdfsConstants.NAMENODE_LEASE_RECHECK_INTERVAL);
         } catch(InterruptedException ie) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(name + " is interrupted", ie);
@@ -406,13 +408,14 @@ public class LeaseManager {
       oldest.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
         try {
-          if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+          if(fsnamesystem.internalReleaseLease(oldest, p, HdfsConstants.NAMENODE_LEASE_HOLDER)) {
             LOG.info("Lease recovery for file " + p +
                           " is complete. File closed.");
             removing.add(p);
-          } else
+          } else {
             LOG.info("Started block recovery for file " + p +
                           " lease " + oldest);
+          }
         } catch (IOException e) {
           LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
           removing.add(p);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Tue Jun  7 01:11:15 2011
@@ -46,6 +46,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -185,8 +187,8 @@ public class NNStorage extends Storage i
     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
     try {
       oldFile.seek(0);
-      int odlVersion = oldFile.readInt();
-      if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+      int oldVersion = oldFile.readInt();
+      if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
         return false;
     } finally {
       oldFile.close();
@@ -570,8 +572,8 @@ public class NNStorage extends Storage i
                             + sd.getRoot() + " is not formatted.");
     }
 
-    // No Block pool ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
-    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+    // Set Block pool ID in version with federation support
+    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       String sbpid = props.getProperty("blockpoolID");
       setBlockPoolID(sd.getRoot(), sbpid);
     }
@@ -599,8 +601,8 @@ public class NNStorage extends Storage i
                            StorageDirectory sd
                            ) throws IOException {
     super.setFields(props, sd);
-    // Set blockpoolID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
-    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+    // Set blockpoolID in version with federation support
+    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       props.setProperty("blockpoolID", blockpoolID);
     }
     boolean uState = getDistributedUpgradeState();
@@ -610,6 +612,18 @@ public class NNStorage extends Storage i
       props.setProperty("distributedUpgradeVersion",
                         Integer.toString(uVersion));
     }
+    /* TODO: resolve merge here.
+    if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, layoutVersion)) {
+    // TODO && ! transactional storage
+      // Though the current NN supports this feature, this function
+      // is called with old layoutVersions from the upgrade tests.
+      
+        // May be null on the first save after an upgrade.
+        imageDigest = MD5Hash.digest(
+            new FileInputStream(getStorageFile(sd, NameNodeFile.IMAGE)));
+      }
+      props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
+    } */
   }
   
   static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
@@ -922,7 +936,7 @@ public class NNStorage extends Storage i
       throw new InconsistentFSStateException(storage,
           "Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
     }
-    blockpoolID = bpid;
+    setBlockPoolID(bpid);
   }
   
   public String getBlockPoolID() {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jun  7 01:11:15 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
@@ -1468,7 +1469,10 @@ public class NameNode implements Namenod
                  FSNamesystem.getNamespaceEditsDirs(conf);
     for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
       File curDir = new File(it.next().getPath());
-      if (!curDir.exists())
+      // Its alright for a dir not to exist, or to exist (properly accessible)
+      // and be completely empty.
+      if (!curDir.exists() ||
+          (curDir.isDirectory() && FileUtil.listFiles(curDir).length == 0))
         continue;
       if (isConfirmationNeeded) {
         if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Tue Jun  7 01:11:15 2011
@@ -268,7 +268,7 @@ class NamenodeJspHelper {
       long nonDFS = total - remaining - used;
       nonDFS = nonDFS < 0 ? 0 : nonDFS;
       float percentUsed = DFSUtil.getPercentUsed(used, total);
-      float percentRemaining = DFSUtil.getPercentRemaining(used, total);
+      float percentRemaining = DFSUtil.getPercentRemaining(remaining, total);
       float median = 0;
       float max = 0;
       float min = 0;

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue Jun  7 01:11:15 2011
@@ -425,7 +425,7 @@ public class SecondaryNameNode implement
       throw new IOException("This is not a DFS");
     }
 
-    String configuredAddress = DFSUtil.getInfoServer(null, conf);
+    String configuredAddress = DFSUtil.getInfoServer(null, conf, true);
     InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
     if (sockAddr.getAddress().isAnyLocalAddress()) {
       if(UserGroupInformation.isSecurityEnabled()) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Tue Jun  7 01:11:15 2011
@@ -20,10 +20,10 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.net.InetSocketAddress;
 import java.util.Enumeration;
 import java.util.List;
 
+import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -46,14 +46,14 @@ public class StreamFile extends DfsServl
 
   public static final String CONTENT_LENGTH = "Content-Length";
 
-  static DataNode datanode = DataNode.getDataNode();
-  
   /** getting a client for connecting to dfs */
   protected DFSClient getDFSClient(HttpServletRequest request)
       throws IOException, InterruptedException {
     final Configuration conf =
       (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
     UserGroupInformation ugi = getUGI(request, conf);
+    final ServletContext context = getServletContext();
+    final DataNode datanode = (DataNode) context.getAttribute("datanode");
     return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
   }
   

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSck.java Tue Jun  7 01:11:15 2011
@@ -229,7 +229,7 @@ public class DFSck extends Configured im
     InetSocketAddress namenode = 
       NameNode.getAddress(dfs.getUri().getAuthority());
     
-    return DFSUtil.getInfoServer(namenode, conf);
+    return DFSUtil.getInfoServer(namenode, conf, true);
   }
 
   private int doWork(final String[] args) throws IOException {
@@ -250,13 +250,8 @@ public class DFSck extends Configured im
     url.append(namenodeAddress);
     System.err.println("Connecting to namenode via " + url.toString());
     
-    url.append("/fsck?ugi=").append(ugi.getShortUserName()).append("&path=");
-    String dir = "/";
-    // find top-level dir first
-    for (int idx = 0; idx < args.length; idx++) {
-      if (!args[idx].startsWith("-")) { dir = args[idx]; break; }
-    }
-    url.append(URLEncoder.encode(dir, "UTF-8"));
+    url.append("/fsck?ugi=").append(ugi.getShortUserName());
+    String dir = null;
     boolean doListCorruptFileBlocks = false;
     for (int idx = 0; idx < args.length; idx++) {
       if (args[idx].equals("-move")) { url.append("&move=1"); }
@@ -269,8 +264,25 @@ public class DFSck extends Configured im
       else if (args[idx].equals("-list-corruptfileblocks")) {
         url.append("&listcorruptfileblocks=1");
         doListCorruptFileBlocks = true;
+      } else if (!args[idx].startsWith("-")) {
+        if (null == dir) {
+          dir = args[idx];
+        } else {
+          System.err.println("fsck: can only operate on one path at a time '"
+              + args[idx] + "'");
+          printUsage();
+          return -1;
+        }
+      } else {
+        System.err.println("fsck: Illegal option '" + args[idx] + "'");
+        printUsage();
+        return -1;
       }
     }
+    if (null == dir) {
+      dir = "/";
+    }
+    url.append("&path=").append(URLEncoder.encode(dir, "UTF-8"));
     if (doListCorruptFileBlocks) {
       return listCorruptFileBlocks(dir, url.toString());
     }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/GetConf.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/GetConf.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/GetConf.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/GetConf.java Tue Jun  7 01:11:15 2011
@@ -67,7 +67,10 @@ public class GetConf extends Configured 
     EXCLUDE_FILE("-excludeFile",
         new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE"),
         "gets the exclude file path that defines the datanodes " +
-        "that need to decommissioned.");
+        "that need to decommissioned."),
+    NNRPCADDRESSES("-nnRpcAddresses", 
+    		new NNRpcAddressesCommandHandler(),
+        "gets the namenode rpc addresses");
 
     private final String cmd;
     private final CommandHandler handler;
@@ -179,6 +182,27 @@ public class GetConf extends Configured 
     }
   }
   
+  /**
+   * Handler for {@link Command#NNRPCADDRESSES}
+   * If rpc addresses are defined in configuration, we return them. Otherwise, 
+   * return empty string.
+   */
+  static class NNRpcAddressesCommandHandler extends CommandHandler {
+    @Override
+    public int doWorkInternal(GetConf tool) throws IOException {
+      Configuration config = tool.getConf();
+      List<InetSocketAddress> rpclist = DFSUtil.getNNServiceRpcAddresses(config);
+      if (rpclist != null) {
+        for (InetSocketAddress rpc : rpclist) {
+          tool.printOut(rpc.getHostName()+":"+rpc.getPort());
+        }
+        return 0;
+      }
+      tool.printError("Did not get namenode service rpc addresses.");
+      return -1;
+    }
+  }
+  
   private final PrintStream out; // Stream for printing command output
   private final PrintStream err; // Stream for printing error
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Tue Jun  7 01:11:15 2011
@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ByteToken;
@@ -39,9 +41,8 @@ import static org.apache.hadoop.hdfs.too
 @InterfaceStability.Unstable
 class EditsLoaderCurrent implements EditsLoader {
 
-  private static int[] supportedVersions = {
-      -18, -19, -20, -21, -22, -23, -24,
-      -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37 };
+  private static int[] supportedVersions = { -18, -19, -20, -21, -22, -23, -24,
+      -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38};
 
   private EditsVisitor v;
   private int editsVersion = 0;
@@ -253,11 +254,6 @@ class EditsLoaderCurrent implements Edit
   private void visit_OP_RENAME() throws IOException {
     visitTxId();
 
-    if(editsVersion > -21) {
-      throw new IOException("Unexpected op code " + FSEditLogOpCodes.OP_RENAME
-        + " for edit log version " + editsVersion
-        + " (op code 15 only expected for 21 and later)");
-    }
     v.visitInt(           EditsElement.LENGTH);
     v.visitStringUTF8(    EditsElement.SOURCE);
     v.visitStringUTF8(    EditsElement.DESTINATION);
@@ -271,12 +267,6 @@ class EditsLoaderCurrent implements Edit
   private void visit_OP_CONCAT_DELETE() throws IOException {
     visitTxId();
 
-    if(editsVersion > -22) {
-      throw new IOException("Unexpected op code "
-        + FSEditLogOpCodes.OP_CONCAT_DELETE
-        + " for edit log version " + editsVersion
-        + " (op code 16 only expected for 22 and later)");
-    }
     IntToken lengthToken = v.visitInt(EditsElement.LENGTH);
     v.visitStringUTF8(EditsElement.CONCAT_TARGET);
     // all except of CONCAT_TARGET and TIMESTAMP
@@ -314,21 +304,15 @@ class EditsLoaderCurrent implements Edit
   private void visit_OP_GET_DELEGATION_TOKEN() throws IOException {
     visitTxId();
     
-    if(editsVersion > -24) {
-      throw new IOException("Unexpected op code "
-          + FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN
-          + " for edit log version " + editsVersion
-          + " (op code 18 only expected for 24 and later)");
-    }
-    v.visitByte(       EditsElement.T_VERSION);
-    v.visitStringText( EditsElement.T_OWNER);
-    v.visitStringText( EditsElement.T_RENEWER);
-    v.visitStringText( EditsElement.T_REAL_USER);
-    v.visitVLong(      EditsElement.T_ISSUE_DATE);
-    v.visitVLong(      EditsElement.T_MAX_DATE);
-    v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
-    v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
-    v.visitStringUTF8( EditsElement.T_EXPIRY_TIME);
+      v.visitByte(       EditsElement.T_VERSION);
+      v.visitStringText( EditsElement.T_OWNER);
+      v.visitStringText( EditsElement.T_RENEWER);
+      v.visitStringText( EditsElement.T_REAL_USER);
+      v.visitVLong(      EditsElement.T_ISSUE_DATE);
+      v.visitVLong(      EditsElement.T_MAX_DATE);
+      v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
+      v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
+      v.visitStringUTF8( EditsElement.T_EXPIRY_TIME);
   }
 
   /**
@@ -338,21 +322,15 @@ class EditsLoaderCurrent implements Edit
     throws IOException {
     visitTxId();
 
-    if(editsVersion > -24) {
-      throw new IOException("Unexpected op code "
-          + FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN
-          + " for edit log version " + editsVersion
-          + " (op code 19 only expected for 24 and later)");
-    }
-    v.visitByte(       EditsElement.T_VERSION);
-    v.visitStringText( EditsElement.T_OWNER);
-    v.visitStringText( EditsElement.T_RENEWER);
-    v.visitStringText( EditsElement.T_REAL_USER);
-    v.visitVLong(      EditsElement.T_ISSUE_DATE);
-    v.visitVLong(      EditsElement.T_MAX_DATE);
-    v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
-    v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
-    v.visitStringUTF8( EditsElement.T_EXPIRY_TIME);
+      v.visitByte(       EditsElement.T_VERSION);
+      v.visitStringText( EditsElement.T_OWNER);
+      v.visitStringText( EditsElement.T_RENEWER);
+      v.visitStringText( EditsElement.T_REAL_USER);
+      v.visitVLong(      EditsElement.T_ISSUE_DATE);
+      v.visitVLong(      EditsElement.T_MAX_DATE);
+      v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
+      v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
+      v.visitStringUTF8( EditsElement.T_EXPIRY_TIME);
   }
 
   /**
@@ -362,20 +340,14 @@ class EditsLoaderCurrent implements Edit
     throws IOException {
     visitTxId();
 
-    if(editsVersion > -24) {
-      throw new IOException("Unexpected op code "
-          + FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN
-          + " for edit log version " + editsVersion
-          + " (op code 20 only expected for 24 and later)");
-    }
-    v.visitByte(       EditsElement.T_VERSION);
-    v.visitStringText( EditsElement.T_OWNER);
-    v.visitStringText( EditsElement.T_RENEWER);
-    v.visitStringText( EditsElement.T_REAL_USER);
-    v.visitVLong(      EditsElement.T_ISSUE_DATE);
-    v.visitVLong(      EditsElement.T_MAX_DATE);
-    v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
-    v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
+      v.visitByte(       EditsElement.T_VERSION);
+      v.visitStringText( EditsElement.T_OWNER);
+      v.visitStringText( EditsElement.T_RENEWER);
+      v.visitStringText( EditsElement.T_REAL_USER);
+      v.visitVLong(      EditsElement.T_ISSUE_DATE);
+      v.visitVLong(      EditsElement.T_MAX_DATE);
+      v.visitVInt(       EditsElement.T_SEQUENCE_NUMBER);
+      v.visitVInt(       EditsElement.T_MASTER_KEY_ID);
   }
 
   /**
@@ -385,18 +357,21 @@ class EditsLoaderCurrent implements Edit
     throws IOException {
     visitTxId();
     
-    if(editsVersion > -24) {
-      throw new IOException("Unexpected op code "
-          + FSEditLogOpCodes.OP_UPDATE_MASTER_KEY
-          + " for edit log version " + editsVersion
-          + "(op code 21 only expected for 24 and later)");
-    }
-    v.visitVInt(  EditsElement.KEY_ID);
-    v.visitVLong( EditsElement.KEY_EXPIRY_DATE);
-    VIntToken blobLengthToken = v.visitVInt(EditsElement.KEY_LENGTH);
-    v.visitBlob(EditsElement.KEY_BLOB, blobLengthToken.value);
+      v.visitVInt(  EditsElement.KEY_ID);
+      v.visitVLong( EditsElement.KEY_EXPIRY_DATE);
+      VIntToken blobLengthToken = v.visitVInt(EditsElement.KEY_LENGTH);
+      v.visitBlob(EditsElement.KEY_BLOB, blobLengthToken.value);
   }
   
+  private void visit_OP_REASSIGN_LEASE()
+    throws IOException {
+    visitTxId();
+
+      v.visitStringUTF8(EditsElement.CLIENT_NAME);
+      v.visitStringUTF8(EditsElement.PATH);
+      v.visitStringUTF8(EditsElement.CLIENT_NAME);
+  }
+
   /**
    * Visit OP_BEGIN_LOG_SEGMENT
    */
@@ -404,7 +379,7 @@ class EditsLoaderCurrent implements Edit
     throws IOException {
     visitTxId();
   }
-
+  
   /**
    * Visit OP_END_LOG_SEGMENT
    */
@@ -474,10 +449,13 @@ class EditsLoaderCurrent implements Edit
       case OP_UPDATE_MASTER_KEY: // 21
         visit_OP_UPDATE_MASTER_KEY();
         break;
-      case OP_END_LOG_SEGMENT: // 22
+      case OP_REASSIGN_LEASE: // 22
+        visit_OP_REASSIGN_LEASE();
+        break;
+      case OP_END_LOG_SEGMENT: // 23
         visit_OP_END_LOG_SEGMENT();
         break;        
-      case OP_START_LOG_SEGMENT: // 23
+      case OP_START_LOG_SEGMENT: // 24
         visit_OP_BEGIN_LOG_SEGMENT();
         break;
       default:
@@ -527,7 +505,8 @@ class EditsLoaderCurrent implements Edit
 
         v.leaveEnclosingElement(); // DATA
         
-        if (editsOpCode != FSEditLogOpCodes.OP_INVALID && editsVersion  <= -28) {
+        if (editsOpCode != FSEditLogOpCodes.OP_INVALID && 
+            LayoutVersion.supports(Feature.EDITS_CHESKUM, editsVersion)) {
           v.visitInt(EditsElement.CHECKSUM);
         }
         v.leaveEnclosingElement(); // RECORD

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Tue Jun  7 01:11:15 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
@@ -121,7 +123,7 @@ class ImageLoaderCurrent implements Imag
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35 };
+      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36 };
   private int imageVersion = 0;
 
   /* (non-Javadoc)
@@ -157,10 +159,11 @@ class ImageLoaderCurrent implements Imag
       v.visit(ImageElement.GENERATION_STAMP, in.readLong());
 
       if (imageVersion <= FSConstants.FIRST_STORED_TXIDS_VERSION) {
+      // TODO use LayoutVersion class
         v.visit(ImageElement.TRANSACTION_ID, in.readLong());
       }
 
-      if (imageVersion <= -25) {
+      if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imageVersion)) {
         boolean isCompressed = in.readBoolean();
         v.visit(ImageElement.IS_COMPRESSED, imageVersion);
         if (isCompressed) {
@@ -180,7 +183,7 @@ class ImageLoaderCurrent implements Imag
 
       processINodesUC(in, v, skipBlocks);
 
-      if (imageVersion <= -24) {
+      if (LayoutVersion.supports(Feature.DELEGATION_TOKEN, imageVersion)) {
         processDelegationTokens(in, v);
       }
       
@@ -339,7 +342,7 @@ class ImageLoaderCurrent implements Imag
     v.visitEnclosingElement(ImageElement.INODES,
         ImageElement.NUM_INODES, numInodes);
     
-    if (imageVersion <= -30) { // local file name
+    if (LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION, imageVersion)) {
       processLocalNameINodes(in, v, numInodes, skipBlocks);
     } else { // full path name
       processFullNameINodes(in, v, numInodes, skipBlocks);
@@ -401,7 +404,6 @@ class ImageLoaderCurrent implements Imag
     * @param v visitor
     * @param skipBlocks skip blocks or not
     * @param parentName the name of its parent node
-    * @return the number of Children
     * @throws IOException
     */
   private void processINode(DataInputStream in, ImageVisitor v,
@@ -418,7 +420,7 @@ class ImageLoaderCurrent implements Imag
     v.visit(ImageElement.INODE_PATH, pathName);
     v.visit(ImageElement.REPLICATION, in.readShort());
     v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
-    if(imageVersion <= -17) // added in version -17
+    if(LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imageVersion))
       v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
     v.visit(ImageElement.BLOCK_SIZE, in.readLong());
     int numBlocks = in.readInt();
@@ -428,10 +430,10 @@ class ImageLoaderCurrent implements Imag
     // File or directory
     if (numBlocks > 0 || numBlocks == -1) {
       v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
-      if(imageVersion <= -18) // added in version -18
+      if (LayoutVersion.supports(Feature.DISKSPACE_QUOTA, imageVersion))
         v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
     }
-    if (imageVersion <= -23 && numBlocks == -2) {
+    if (numBlocks == -2) {
       v.visit(ImageElement.SYMLINK, Text.readString(in));
     }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java Tue Jun  7 01:11:15 2011
@@ -47,8 +47,7 @@ public class OfflineImageViewer {
     "saving the results in OUTPUTFILE.\n" +
     "\n" +
     "The oiv utility will attempt to parse correctly formed image files\n" +
-    "and will abort fail with mal-formed image files. Currently the\n" +
-    "supports FSImage layout versions -16 through -23.\n" +
+    "and will abort fail with mal-formed image files.\n" +
     "\n" +
     "The tool works offline and does not require a running cluster in\n" +
     "order to process an image file.\n" +

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Tue Jun  7 01:11:15 2011
@@ -50,6 +50,8 @@ public class TestFiDataTransferProtocol2
   static final int MIN_N_PACKET = 3;
   static final int MAX_N_PACKET = 10;
 
+  static final int MAX_SLEEP = 1000;
+
   static final Configuration conf = new Configuration();
   static {
     conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
@@ -89,7 +91,7 @@ public class TestFiDataTransferProtocol2
         + ", lastPacketSize=" + lastPacketSize);
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
-        ).numDataNodes(REPLICATION + 1).build();
+        ).numDataNodes(REPLICATION + 2).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");
@@ -128,12 +130,11 @@ public class TestFiDataTransferProtocol2
   private void runTest17_19(String methodName, int dnIndex)
       throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final int maxSleep = 3000;
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
-    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
-    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
-    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, MAX_SLEEP));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, MAX_SLEEP));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, MAX_SLEEP));
     t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3));
     t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
     writeSeveralPackets(methodName);
@@ -142,12 +143,11 @@ public class TestFiDataTransferProtocol2
 
   private void runTest29_30(String methodName, int dnIndex) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final int maxSleep = 3000;
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
-    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
-    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
-    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, MAX_SLEEP));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, MAX_SLEEP));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, MAX_SLEEP));
     t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3));
     t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
     writeSeveralPackets(methodName);
@@ -209,7 +209,7 @@ public class TestFiDataTransferProtocol2
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
-    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 3000));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, MAX_SLEEP));
     writeSeveralPackets(methodName);
   }
 
@@ -223,7 +223,7 @@ public class TestFiDataTransferProtocol2
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
-    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 3000));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, MAX_SLEEP));
     writeSeveralPackets(methodName);
   }
   
@@ -237,7 +237,7 @@ public class TestFiDataTransferProtocol2
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
-    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 3000));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, MAX_SLEEP));
     writeSeveralPackets(methodName);
   }
   

Propchange: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun  7 01:11:15 2011
@@ -3,4 +3,4 @@
 /hadoop/hdfs/branches/HDFS-1052/src/test/hdfs:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:1086482-1128452
+/hadoop/hdfs/trunk/src/test/hdfs:1086482-1132839

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1132846&r1=1132845&r2=1132846&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jun  7 01:11:15 2011
@@ -1072,34 +1072,47 @@ public class MiniDFSCluster {
       nameNodes[nnIndex] = new NameNodeInfo(null, conf);
     }
   }
-
+  
+  /**
+   * Restart the namenode.
+   */
+  public synchronized void restartNameNode() throws IOException {
+    checkSingleNameNode();
+    restartNameNode(true);
+  }
+  
+  /**
+   * Restart the namenode. Optionally wait for the cluster to become active.
+   */
+  public synchronized void restartNameNode(boolean waitActive)
+      throws IOException {
+    checkSingleNameNode();
+    restartNameNode(0, waitActive);
+  }
+  
   /**
-   * Restart namenode at a given index.
+   * Restart the namenode at a given index.
    */
   public synchronized void restartNameNode(int nnIndex) throws IOException {
+    restartNameNode(nnIndex, true);
+  }
+
+  /**
+   * Restart the namenode at a given index. Optionally wait for the cluster
+   * to become active.
+   */
+  public synchronized void restartNameNode(int nnIndex, boolean waitActive)
+      throws IOException {
     Configuration conf = nameNodes[nnIndex].conf;
     shutdownNameNode(nnIndex);
     NameNode nn = NameNode.createNameNode(new String[] {}, conf);
     nameNodes[nnIndex] = new NameNodeInfo(nn, conf);
-    waitClusterUp();
-    System.out.println("Restarted the namenode");
-    int failedCount = 0;
-    while (true) {
-      try {
-        waitActive();
-        break;
-      } catch (IOException e) {
-        failedCount++;
-        // Cached RPC connection to namenode, if any, is expected to fail once
-        if (failedCount > 1) {
-          System.out.println("Tried waitActive() " + failedCount
-              + " time(s) and failed, giving up.  "
-              + StringUtils.stringifyException(e));
-          throw e;
-        }
-      }
+    if (waitActive) {
+      waitClusterUp();
+      System.out.println("Restarted the namenode");
+      waitActive();
+      System.out.println("Cluster is active");
     }
-    System.out.println("Cluster is active");
   }
 
   /**
@@ -1420,7 +1433,22 @@ public class MiniDFSCluster {
    */
   public void waitActive() throws IOException {
     for (int index = 0; index < nameNodes.length; index++) {
-      waitActive(index);
+      int failedCount = 0;
+      while (true) {
+        try {
+          waitActive(index);
+          break;
+        } catch (IOException e) {
+          failedCount++;
+          // Cached RPC connection to namenode, if any, is expected to fail once
+          if (failedCount > 1) {
+            System.out.println("Tried waitActive() " + failedCount
+                + " time(s) and failed, giving up.  "
+                + StringUtils.stringifyException(e));
+            throw e;
+          }
+        }
+      }
     }
   }
   
@@ -1565,7 +1593,7 @@ public class MiniDFSCluster {
   /**
    * Set the softLimit and hardLimit of client lease periods
    */
-  void setLeasePeriod(long soft, long hard) {
+  public void setLeasePeriod(long soft, long hard) {
     final FSNamesystem namesystem = getNamesystem();
     namesystem.leaseManager.setLeasePeriod(soft, hard);
     namesystem.lmthread.interrupt();