You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/16 23:07:38 UTC

svn commit: r1532910 [3/4] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apac...

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Oct 16 21:07:28 2013
@@ -123,6 +123,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -165,14 +166,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
+import org.apache.hadoop.hdfs.server.blockmanagement.*;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@@ -214,6 +208,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.RetryCache.CacheEntry;
 import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
@@ -242,6 +237,7 @@ import org.mortbay.util.ajax.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
 /**
@@ -462,6 +458,11 @@ public class FSNamesystem implements Nam
   private HAContext haContext;
 
   private final boolean haEnabled;
+  
+  /**
+   * Whether the namenode is in the middle of starting the active service
+   */
+  private volatile boolean startingActiveService = false;
     
   private INodeId inodeId;
   
@@ -910,6 +911,7 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   void startActiveServices() throws IOException {
+    startingActiveService = true;
     LOG.info("Starting services required for active state");
     writeLock();
     try {
@@ -964,8 +966,19 @@ public class FSNamesystem implements Nam
       nnrmthread.start();
     } finally {
       writeUnlock();
+      startingActiveService = false;
     }
   }
+  
+  /**
+   * @return Whether the namenode is transitioning to active state and is in the
+   *         middle of the {@link #startActiveServices()}
+   */
+  public boolean inTransitionToActive() {
+    return haEnabled && haContext != null
+        && haContext.getState().getServiceState() == HAServiceState.ACTIVE
+        && startingActiveService;
+  }
 
   private boolean shouldUseDelegationTokens() {
     return UserGroupInformation.isSecurityEnabled() ||
@@ -1058,6 +1071,26 @@ public class FSNamesystem implements Nam
     }
   }
   
+  /**
+   * @throws RetriableException
+   *           If 1) The NameNode is in SafeMode, 2) HA is enabled, and 3)
+   *           NameNode is in active state
+   * @throws SafeModeException
+   *           Otherwise if NameNode is in SafeMode.
+   */
+  private void checkNameNodeSafeMode(String errorMsg)
+      throws RetriableException, SafeModeException {
+    if (isInSafeMode()) {
+      SafeModeException se = new SafeModeException(errorMsg, safeMode);
+      if (haEnabled && haContext != null
+          && haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+        throw new RetriableException(se);
+      } else {
+        throw se;
+      }
+    }
+  }
+  
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
     return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
   }
@@ -1359,9 +1392,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set permission for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set permission for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       dir.setPermission(src, permission);
@@ -1398,9 +1429,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set owner for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set owner for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       if (!pc.isSuperUser()) {
@@ -1480,8 +1509,14 @@ public class FSNamesystem implements Nam
       for (LocatedBlock b : ret.getLocatedBlocks()) {
         // if safemode & no block locations yet then throw safemodeException
         if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
-          throw new SafeModeException("Zero blocklocations for " + src,
-              safeMode);
+          SafeModeException se = new SafeModeException(
+              "Zero blocklocations for " + src, safeMode);
+          if (haEnabled && haContext != null && 
+              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+            throw new RetriableException(se);
+          } else {
+            throw se;
+          }
         }
       }
     }
@@ -1622,9 +1657,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot concat " + target, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot concat " + target);
       concatInternal(pc, target, srcs, logRetryCache);
       resultingStat = getAuditFileInfo(target, false);
     } finally {
@@ -1772,9 +1805,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set times " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set times " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       // Write access is required to set access and modification times
@@ -1801,16 +1832,16 @@ public class FSNamesystem implements Nam
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
-    if (cacheEntry != null && cacheEntry.isSuccess()) {
-      return; // Return previous response
-    }
     if (!DFSUtil.isValidName(link)) {
       throw new InvalidPathException("Invalid link name: " + link);
     }
     if (FSDirectory.isReservedName(target)) {
       throw new InvalidPathException("Invalid target name: " + target);
     }
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     boolean success = false;
     try {
       createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
@@ -1837,9 +1868,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create symlink " + link, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create symlink " + link);
       link = FSDirectory.resolvePath(link, pathComponents, dir);
       if (!createParent) {
         verifyParentDir(link);
@@ -1897,9 +1926,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set replication for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set replication for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.WRITE);
@@ -2029,9 +2056,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create file" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       startFileInternal(pc, src, permissions, holder, clientMachine, create,
           overwrite, createParent, replication, blockSize, logRetryCache);
@@ -2250,10 +2275,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-            "Cannot recover the lease of " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot recover the lease of " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
@@ -2404,9 +2426,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot append to file" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot append to file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
     } catch (StandbyException se) {
@@ -2556,9 +2576,7 @@ public class FSNamesystem implements Nam
     checkBlock(previous);
     onRetryBlock[0] = null;
     checkOperation(OperationCategory.WRITE);
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot add block to " + src, safeMode);
-    }
+    checkNameNodeSafeMode("Cannot add block to " + src);
 
     // have we exceeded the configured limit of fs objects.
     checkFsObjectLimit();
@@ -2667,10 +2685,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.READ);
       //check safe mode
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot add datanode; src=" + src
-            + ", blk=" + blk, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //check lease
@@ -2710,10 +2725,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot abandon block " + b +
-                                    " for fle" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot abandon block " + b + " for fle" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //
@@ -2796,9 +2808,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot complete file " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot complete file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       success = completeFileInternal(src, holder,
         ExtendedBlock.getLocalBlock(last), fileId);
@@ -2973,9 +2983,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename " + src);
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       checkOperation(OperationCategory.WRITE);
@@ -3025,10 +3033,6 @@ public class FSNamesystem implements Nam
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
-    if (cacheEntry != null && cacheEntry.isSuccess()) {
-      return; // Return previous response
-    }
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -3036,8 +3040,13 @@ public class FSNamesystem implements Nam
     if (!DFSUtil.isValidName(dst)) {
       throw new InvalidPathException("Invalid name: " + dst);
     }
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
+    
     checkOperation(OperationCategory.WRITE);
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
     HdfsFileStatus resultingStat = null;
@@ -3045,9 +3054,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename " + src);
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       renameToInternal(pc, src, dst, cacheEntry != null, options);
@@ -3153,9 +3160,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot delete " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot delete " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (!recursive && dir.isNonEmptyDirectory(src)) {
         throw new IOException(src + " is non empty");
@@ -3374,9 +3379,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);   
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create directory " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create directory " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
@@ -3476,9 +3479,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set quota on " + path, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set quota on " + path);
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
       writeUnlock();
@@ -3501,9 +3502,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot fsync file " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot fsync file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
       if (lastBlockLength > 0) {
@@ -3707,6 +3706,39 @@ public class FSNamesystem implements Nam
   BlockInfo getStoredBlock(Block block) {
     return blockManager.getStoredBlock(block);
   }
+  
+  @Override
+  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
+    assert hasReadOrWriteLock();
+    final BlockCollection bc = blockUC.getBlockCollection();
+    if (bc == null || !(bc instanceof INodeFileUnderConstruction)) {
+      return false;
+    }
+
+    INodeFileUnderConstruction inodeUC = (INodeFileUnderConstruction) blockUC
+        .getBlockCollection();
+    String fullName = inodeUC.getName();
+    try {
+      if (fullName != null && fullName.startsWith(Path.SEPARATOR)
+          && dir.getINode(fullName) == inodeUC) {
+        // If file exists in normal path then no need to look in snapshot
+        return false;
+      }
+    } catch (UnresolvedLinkException e) {
+      LOG.error("Error while resolving the link : " + fullName, e);
+      return false;
+    }
+    /*
+     * 1. if bc is an instance of INodeFileUnderConstructionWithSnapshot, and
+     * bc is not in the current fsdirectory tree, bc must represent a snapshot
+     * file. 
+     * 2. if fullName is not an absolute path, bc cannot be existent in the 
+     * current fsdirectory tree. 
+     * 3. if bc is not the current node associated with fullName, bc must be a
+     * snapshot inode.
+     */
+    return true;
+  }
 
   void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
@@ -3728,11 +3760,8 @@ public class FSNamesystem implements Nam
       // If a DN tries to commit to the standby, the recovery will
       // fail, and the next retry will succeed on the new NN.
   
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-          "Cannot commitBlockSynchronization while in safe mode",
-          safeMode);
-      }
+      checkNameNodeSafeMode(
+          "Cannot commitBlockSynchronization while in safe mode");
       final BlockInfo storedBlock = getStoredBlock(
           ExtendedBlock.getLocalBlock(lastblock));
       if (storedBlock == null) {
@@ -3885,9 +3914,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot renew lease for " + holder);
       leaseManager.renewLease(holder);
     } finally {
       writeUnlock();
@@ -3924,11 +3951,27 @@ public class FSNamesystem implements Nam
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    String startAfterString = new String(startAfter);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
+      // Get file name when startAfter is an INodePath
+      if (FSDirectory.isReservedName(startAfterString)) {
+        byte[][] startAfterComponents = FSDirectory
+            .getPathComponentsForReservedPath(startAfterString);
+        try {
+          String tmp = FSDirectory.resolvePath(src, startAfterComponents, dir);
+          byte[][] regularPath = INode.getPathComponents(tmp);
+          startAfter = regularPath[regularPath.length - 1];
+        } catch (IOException e) {
+          // Possibly the inode is deleted
+          throw new DirectoryListingStartAfterNotFoundException(
+              "Can't find startAfter " + startAfterString);
+        }
+      }
+      
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
           checkPathAccess(pc, src, FsAction.READ_EXECUTE);
@@ -4218,7 +4261,6 @@ public class FSNamesystem implements Nam
     return JSON.toString(info);
   }
 
-
   int getNumberOfDatanodes(DatanodeReportType type) {
     readLock();
     try {
@@ -4258,19 +4300,20 @@ public class FSNamesystem implements Nam
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    checkOperation(OperationCategory.UNCHECKED);
+    checkSuperuserPrivilege();
+    
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    checkSuperuserPrivilege();
-    checkOperation(OperationCategory.UNCHECKED);
     boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.UNCHECKED);
       if (!isInSafeMode()) {
-        throw new IOException("Safe mode should be turned ON " +
-                              "in order to create namespace image.");
+        throw new IOException("Safe mode should be turned ON "
+            + "in order to create namespace image.");
       }
       getFSImage().saveNamespace(this);
       success = true;
@@ -4347,7 +4390,7 @@ public class FSNamesystem implements Nam
    * replicas, and calculates the ratio of safe blocks to the total number
    * of blocks in the system, which is the size of blocks in
    * {@link FSNamesystem#blockManager}. When the ratio reaches the
-   * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
+   * {@link #threshold} it starts the SafeModeMonitor daemon in order
    * to monitor whether the safe mode {@link #extension} is passed.
    * Then it leaves safe mode and destroys itself.
    * <p>
@@ -4355,10 +4398,9 @@ public class FSNamesystem implements Nam
    * not tracked because the name node is not intended to leave safe mode
    * automatically in the case.
    *
-   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction)
-   * @see SafeModeMonitor
+   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
    */
-  class SafeModeInfo {
+  public class SafeModeInfo {
     // configuration fields
     /** Safe mode threshold condition %.*/
     private double threshold;
@@ -5100,9 +5142,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Log not rolled", safeMode);
-      }
+      checkNameNodeSafeMode("Log not rolled");
       LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
       return getFSImage().rollEditLog();
     } finally {
@@ -5123,9 +5163,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Checkpoint not started", safeMode);
-      }
+      checkNameNodeSafeMode("Checkpoint not started");
       LOG.info("Start checkpoint for " + backupNode.getAddress());
       cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
       getEditLog().logSync();
@@ -5149,19 +5187,17 @@ public class FSNamesystem implements Nam
   
   void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    checkOperation(OperationCategory.CHECKPOINT);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    checkOperation(OperationCategory.CHECKPOINT);
     boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Checkpoint not ended", safeMode);
-      }
+      checkNameNodeSafeMode("Checkpoint not ended");
       LOG.info("End checkpoint for " + registration.getAddress());
       getFSImage().endCheckpoint(sig);
       success = true;
@@ -5262,7 +5298,8 @@ public class FSNamesystem implements Nam
   /**
    * Get the total number of objects in the system. 
    */
-  long getMaxObjects() {
+  @Override // FSNamesystemMBean
+  public long getMaxObjects() {
     return maxFsObjects;
   }
 
@@ -5407,7 +5444,7 @@ public class FSNamesystem implements Nam
   @Override // FSNamesystemMBean
   public int getNumDecomDeadDataNodes() {
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    getBlockManager().getDatanodeManager().fetchDatanodes(dead, null, true);
+    getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true);
     int deadDecommissioned = 0;
     for (DatanodeDescriptor node : dead) {
       deadDecommissioned += node.isDecommissioned() ? 1 : 0;
@@ -5416,6 +5453,12 @@ public class FSNamesystem implements Nam
   }
 
   @Override // FSNamesystemMBean
+  public int getNumDecommissioningDataNodes() {
+    return getBlockManager().getDatanodeManager().getDecommissioningNodes()
+        .size();
+  }
+
+  @Override // FSNamesystemMBean
   @Metric({"StaleDataNodes", 
     "Number of datanodes marked stale due to delayed heartbeat"})
   public int getNumStaleDataNodes() {
@@ -5513,10 +5556,7 @@ public class FSNamesystem implements Nam
   long nextGenerationStamp(boolean legacyBlock)
       throws IOException, SafeModeException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot get next generation stamp", safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get next generation stamp");
 
     long gs;
     if (legacyBlock) {
@@ -5569,12 +5609,9 @@ public class FSNamesystem implements Nam
   /**
    * Increments, logs and then returns the block ID
    */
-  private long nextBlockId() throws SafeModeException {
+  private long nextBlockId() throws IOException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot get next block ID", safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get next block ID");
     final long blockId = blockIdGenerator.nextValue();
     getEditLog().logAllocateBlockId(blockId);
     // NB: callers sync the log
@@ -5584,10 +5621,8 @@ public class FSNamesystem implements Nam
   private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
       String clientName) throws IOException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot get a new generation stamp and an " +
-                                "access token for block " + block, safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get a new generation stamp and an "
+        + "access token for block " + block);
     
     // check stored block state
     BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
@@ -5685,11 +5720,11 @@ public class FSNamesystem implements Nam
   void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
       throws IOException {
+    checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    checkOperation(OperationCategory.WRITE);
     LOG.info("updatePipeline(block=" + oldBlock
              + ", newGenerationStamp=" + newBlock.getGenerationStamp()
              + ", newLength=" + newBlock.getNumBytes()
@@ -5700,9 +5735,7 @@ public class FSNamesystem implements Nam
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Pipeline not updated", safeMode);
-      }
+      checkNameNodeSafeMode("Pipeline not updated");
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
       updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
@@ -5957,9 +5990,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot issue delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot issue delegation token");
       if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
           "Delegation Token can be issued only with kerberos or web authentication");
@@ -6004,9 +6035,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot renew delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot renew delegation token");
       if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
             "Delegation Token can be renewed only with kerberos or web authentication");
@@ -6037,9 +6066,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot cancel delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot cancel delegation token");
       String canceller = getRemoteUser().getUserName();
       DelegationTokenIdentifier id = dtSecretManager
         .cancelToken(token, canceller);
@@ -6265,14 +6292,25 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
     for (DatanodeDescriptor node : live) {
-      final Map<String, Object> innerinfo = new HashMap<String, Object>();
-      innerinfo.put("lastContact", getLastContact(node));
-      innerinfo.put("usedSpace", getDfsUsed(node));
-      innerinfo.put("adminState", node.getAdminState().toString());
-      innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed());
-      innerinfo.put("capacity", node.getCapacity());
-      innerinfo.put("numBlocks", node.numBlocks());
-      innerinfo.put("version", node.getSoftwareVersion());
+      Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
+          .put("infoAddr", node.getInfoAddr())
+          .put("infoSecureAddr", node.getInfoSecureAddr())
+          .put("xferaddr", node.getXferAddr())
+          .put("lastContact", getLastContact(node))
+          .put("usedSpace", getDfsUsed(node))
+          .put("adminState", node.getAdminState().toString())
+          .put("nonDfsUsedSpace", node.getNonDfsUsed())
+          .put("capacity", node.getCapacity())
+          .put("numBlocks", node.numBlocks())
+          .put("version", node.getSoftwareVersion())
+          .put("used", node.getDfsUsed())
+          .put("remaining", node.getRemaining())
+          .put("blockScheduled", node.getBlocksScheduled())
+          .put("blockPoolUsed", node.getBlockPoolUsed())
+          .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
+          .put("volfails", node.getVolumeFailures())
+          .build();
+
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -6289,9 +6327,11 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(null, dead, true);
     for (DatanodeDescriptor node : dead) {
-      final Map<String, Object> innerinfo = new HashMap<String, Object>();
-      innerinfo.put("lastContact", getLastContact(node));
-      innerinfo.put("decommissioned", node.isDecommissioned());
+      Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
+          .put("lastContact", getLastContact(node))
+          .put("decommissioned", node.isDecommissioned())
+          .put("xferaddr", node.getXferAddr())
+          .build();
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -6308,13 +6348,16 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager(
         ).getDecommissioningNodes();
     for (DatanodeDescriptor node : decomNodeList) {
-      final Map<String, Object> innerinfo = new HashMap<String, Object>();
-      innerinfo.put("underReplicatedBlocks", node.decommissioningStatus
-          .getUnderReplicatedBlocks());
-      innerinfo.put("decommissionOnlyReplicas", node.decommissioningStatus
-          .getDecommissionOnlyReplicas());
-      innerinfo.put("underReplicateInOpenFiles", node.decommissioningStatus
-          .getUnderReplicatedInOpenFiles());
+      Map<String, Object> innerinfo = ImmutableMap
+          .<String, Object> builder()
+          .put("xferaddr", node.getXferAddr())
+          .put("underReplicatedBlocks",
+              node.decommissioningStatus.getUnderReplicatedBlocks())
+          .put("decommissionOnlyReplicas",
+              node.decommissioningStatus.getDecommissionOnlyReplicas())
+          .put("underReplicateInOpenFiles",
+              node.decommissioningStatus.getUnderReplicatedInOpenFiles())
+          .build();
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -6504,11 +6547,17 @@ public class FSNamesystem implements Nam
    * Verifies that the given identifier and password are valid and match.
    * @param identifier Token identifier.
    * @param password Password in the token.
-   * @throws InvalidToken
    */
   public synchronized void verifyToken(DelegationTokenIdentifier identifier,
-      byte[] password) throws InvalidToken {
-    getDelegationTokenSecretManager().verifyToken(identifier, password);
+      byte[] password) throws InvalidToken, RetriableException {
+    try {
+      getDelegationTokenSecretManager().verifyToken(identifier, password);
+    } catch (InvalidToken it) {
+      if (inTransitionToActive()) {
+        throw new RetriableException(it);
+      }
+      throw it;
+    }
   }
   
   @Override
@@ -6526,6 +6575,11 @@ public class FSNamesystem implements Nam
   }
   
   @VisibleForTesting
+  public void setEditLogTailerForTests(EditLogTailer tailer) {
+    this.editLogTailer = tailer;
+  }
+  
+  @VisibleForTesting
   void setFsLockForTests(ReentrantReadWriteLock lock) {
     this.fsLock = lock;
   }
@@ -6560,10 +6614,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot allow snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot allow snapshot for " + path);
       checkSuperuserPrivilege();
 
       dir.writeLock();
@@ -6588,10 +6639,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot disallow snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
       checkSuperuserPrivilege();
 
       dir.writeLock();
@@ -6618,20 +6666,18 @@ public class FSNamesystem implements Nam
    */
   String createSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
         null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (String) cacheEntry.getPayload();
     }
-    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     String snapshotPath = null;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create snapshot for "
-            + snapshotRoot, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
       if (isPermissionEnabled) {
         checkOwner(pc, snapshotRoot);
       }
@@ -6670,19 +6716,17 @@ public class FSNamesystem implements Nam
    */
   void renameSnapshot(String path, String snapshotOldName,
       String snapshotNewName) throws SafeModeException, IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename snapshot for " + path);
       if (isPermissionEnabled) {
         checkOwner(pc, path);
       }
@@ -6715,10 +6759,10 @@ public class FSNamesystem implements Nam
   public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
       throws IOException {
     SnapshottableDirectoryStatus[] status = null;
+    final FSPermissionChecker checker = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      FSPermissionChecker checker = getPermissionChecker();
       final String user = checker.isSuperUser()? null : checker.getUser();
       status = snapshotManager.getSnapshottableDirListing(user);
     } finally {
@@ -6786,21 +6830,21 @@ public class FSNamesystem implements Nam
    */
   void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
+    checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
     boolean success = false;
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-            "Cannot delete snapshot for " + snapshotRoot, safeMode);
+      checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
+      if (isPermissionEnabled) {
+        checkOwner(pc, snapshotRoot);
       }
-      checkOwner(pc, snapshotRoot);
 
       BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
       List<INode> removedINodes = new ChunkedArrayList<INode>();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Wed Oct 16 21:07:28 2013
@@ -57,9 +57,14 @@ public class FileChecksumServlets {
       final String hostname = host instanceof DatanodeInfo 
           ? ((DatanodeInfo)host).getHostName() : host.getIpAddr();
       final String scheme = request.getScheme();
-      final int port = "https".equals(scheme)
-          ? (Integer)getServletContext().getAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY)
-          : host.getInfoPort();
+      int port = host.getInfoPort();
+      if ("https".equals(scheme)) {
+        final Integer portObject = (Integer) getServletContext().getAttribute(
+            DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
+        if (portObject != null) {
+          port = portObject;
+        }
+      }
       final String encodedPath = ServletUtil.getRawPath(request, "/fileChecksum");
 
       String dtParam = "";

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Wed Oct 16 21:07:28 2013
@@ -61,9 +61,14 @@ public class FileDataServlet extends Dfs
     } else {
       hostname = host.getIpAddr();
     }
-    final int port = "https".equals(scheme)
-      ? (Integer)getServletContext().getAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY)
-      : host.getInfoPort();
+    int port = host.getInfoPort();
+    if ("https".equals(scheme)) {
+      final Integer portObject = (Integer) getServletContext().getAttribute(
+          DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
+      if (portObject != null) {
+        port = portObject;
+      }
+    }
 
     String dtParam = "";
     if (dt != null) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Wed Oct 16 21:07:28 2013
@@ -52,6 +52,7 @@ public class NameNodeHttpServer {
   private final NameNode nn;
   
   private InetSocketAddress httpAddress;
+  private InetSocketAddress httpsAddress;
   private InetSocketAddress bindAddress;
   
   public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
@@ -99,14 +100,15 @@ public class NameNodeHttpServer {
     boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false);
     if (certSSL) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
-      InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(infoHost + ":" + conf.get(
-        DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, "0"));
+      httpsAddress = NetUtils.createSocketAddr(conf.get(
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
+
       Configuration sslConf = new Configuration(false);
-      if (certSSL) {
-        sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
-                                     "ssl-server.xml"));
-      }
-      httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
+      sslConf.addResource(conf.get(
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+      httpServer.addSslListener(httpsAddress, sslConf, needClientAuth);
       // assume same ssl port for all datanodes
       InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
         DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 50475));
@@ -163,6 +165,10 @@ public class NameNodeHttpServer {
     return httpAddress;
   }
 
+  public InetSocketAddress getHttpsAddress() {
+    return httpsAddress;
+  }
+
   /**
    * Sets fsimage for use by servlets.
    * 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Oct 16 21:07:28 2013
@@ -28,12 +28,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URLEncoder;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -211,6 +206,9 @@ class NamenodeJspHelper {
 
   static void generateSnapshotReport(JspWriter out, FSNamesystem fsn)
       throws IOException {
+    if (fsn == null) {
+      return;
+    }
     out.println("<div id=\"snapshotstats\"><div class=\"dfstable\">"
         + "<table class=\"storage\" title=\"Snapshot Summary\">\n"
         + "<thead><tr><td><b>Snapshottable directories</b></td>"
@@ -653,25 +651,22 @@ class NamenodeJspHelper {
         .getAttribute(JspHelper.CURRENT_CONF);
     // We can't redirect if there isn't a DN to redirect to.
     // Lets instead show a proper error message.
-    if (nn.getNamesystem().getNumLiveDataNodes() < 1) {
+    FSNamesystem fsn = nn.getNamesystem();
+
+    DatanodeID datanode = null;
+    if (fsn != null && fsn.getNumLiveDataNodes() >= 1) {
+      datanode = getRandomDatanode(nn);
+    }
+
+    if (datanode == null) {
       throw new IOException("Can't browse the DFS since there are no " +
           "live nodes available to redirect to.");
     }
-    final DatanodeID datanode = getRandomDatanode(nn);;
+
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
+    // if the user is defined, get a delegation token and stringify it
     String tokenString = getDelegationToken(
         nn.getRpcServer(), request, conf, ugi);
-    // if the user is defined, get a delegation token and stringify it
-    final String redirectLocation;
-    final String nodeToRedirect;
-    int redirectPort;
-    if (datanode != null) {
-      nodeToRedirect = datanode.getIpAddr();
-      redirectPort = datanode.getInfoPort();
-    } else {
-      nodeToRedirect = nn.getHttpAddress().getHostName();
-      redirectPort = nn.getHttpAddress().getPort();
-    }
 
     InetSocketAddress rpcAddr = nn.getNameNodeAddress();
     String rpcHost = rpcAddr.getAddress().isAnyLocalAddress()
@@ -679,16 +674,31 @@ class NamenodeJspHelper {
       : rpcAddr.getAddress().getHostAddress();
     String addr = rpcHost + ":" + rpcAddr.getPort();
 
-    String fqdn = InetAddress.getByName(nodeToRedirect).getCanonicalHostName();
-    redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":" + redirectPort
+    final String redirectLocation =
+        JspHelper.Url.url(request.getScheme(), datanode)
         + "/browseDirectory.jsp?namenodeInfoPort="
-        + nn.getHttpAddress().getPort() + "&dir=/"
+        + request.getServerPort() + "&dir=/"
         + (tokenString == null ? "" :
            JspHelper.getDelegationTokenUrlParam(tokenString))
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
+
     resp.sendRedirect(redirectLocation);
   }
 
+  /**
+   * Returns a descriptive label for the running NameNode.  If the NameNode has
+   * initialized to the point of running its RPC server, then this label consists
+   * of the host and port of the RPC server.  Otherwise, the label is a message
+   * stating that the NameNode is still initializing.
+   * 
+   * @param nn NameNode to describe
+   * @return String NameNode label
+   */
+  static String getNameNodeLabel(NameNode nn) {
+    return nn.getRpcServer() != null ? nn.getNameNodeAddressHostPortString() :
+      "initializing";
+  }
+
   static class NodeListJsp {
     private int rowNum = 0;
 
@@ -726,12 +736,11 @@ class NamenodeJspHelper {
     }
 
     private void generateNodeDataHeader(JspWriter out, DatanodeDescriptor d,
-        String suffix, boolean alive, int nnHttpPort, String nnaddr)
+        String suffix, boolean alive, int nnInfoPort, String nnaddr, String scheme)
         throws IOException {
       // from nn_browsedfscontent.jsp:
-      String url = HttpConfig.getSchemePrefix() + d.getHostName() + ":"
-          + d.getInfoPort()
-          + "/browseDirectory.jsp?namenodeInfoPort=" + nnHttpPort + "&dir="
+      String url = "///" + JspHelper.Url.authority(scheme, d)
+          + "/browseDirectory.jsp?namenodeInfoPort=" + nnInfoPort + "&dir="
           + URLEncoder.encode("/", "UTF-8")
           + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnaddr);
 
@@ -748,9 +757,9 @@ class NamenodeJspHelper {
     }
 
     void generateDecommissioningNodeData(JspWriter out, DatanodeDescriptor d,
-        String suffix, boolean alive, int nnHttpPort, String nnaddr)
+        String suffix, boolean alive, int nnInfoPort, String nnaddr, String scheme)
         throws IOException {
-      generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
+      generateNodeDataHeader(out, d, suffix, alive, nnInfoPort, nnaddr, scheme);
       if (!alive) {
         return;
       }
@@ -774,7 +783,7 @@ class NamenodeJspHelper {
     }
     
     void generateNodeData(JspWriter out, DatanodeDescriptor d, String suffix,
-        boolean alive, int nnHttpPort, String nnaddr) throws IOException {
+        boolean alive, int nnInfoPort, String nnaddr, String scheme) throws IOException {
       /*
        * Say the datanode is dn1.hadoop.apache.org with ip 192.168.0.5 we use:
        * 1) d.getHostName():d.getPort() to display. Domain and port are stripped
@@ -786,7 +795,7 @@ class NamenodeJspHelper {
        * interact with datanodes.
        */
 
-      generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
+      generateNodeDataHeader(out, d, suffix, alive, nnInfoPort, nnaddr, scheme);
       long currentTime = Time.now();
       long timestamp = d.getLastUpdate();
       if (!alive) {
@@ -844,17 +853,17 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
       final FSNamesystem ns = nn.getNamesystem();
+      if (ns == null) {
+        return;
+      }
       final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
 
       final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
       final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       dm.fetchDatanodes(live, dead, true);
 
-      InetSocketAddress nnSocketAddress =
-          (InetSocketAddress)context.getAttribute(
-              NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
-      String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
-          + nnSocketAddress.getPort();
+      String nnaddr = nn.getServiceRpcAddress().getAddress().getHostName() + ":"
+          + nn.getServiceRpcAddress().getPort();
 
       whatNodes = request.getParameter("whatNodes"); // show only live or only
                                                      // dead nodes
@@ -890,16 +899,11 @@ class NamenodeJspHelper {
 
       counterReset();
 
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-
       if (live.isEmpty() && dead.isEmpty()) {
         out.print("There are no datanodes in the cluster");
       } else {
 
-        int nnHttpPort = nn.getHttpAddress().getPort();
+        int nnInfoPort = request.getServerPort();
         out.print("<div id=\"dfsnodetable\"> ");
         if (whatNodes.equals("LIVE")) {
           out.print("<a name=\"LiveNodes\" id=\"title\">" + "Live Datanodes : "
@@ -941,8 +945,8 @@ class NamenodeJspHelper {
 
             JspHelper.sortNodeList(live, sorterField, sorterOrder);
             for (int i = 0; i < live.size(); i++) {
-              generateNodeData(out, live.get(i), port_suffix, true, nnHttpPort,
-                  nnaddr);
+              generateNodeData(out, live.get(i), port_suffix, true, nnInfoPort,
+                  nnaddr, request.getScheme());
             }
           }
           out.print("</table>\n");
@@ -964,7 +968,7 @@ class NamenodeJspHelper {
             JspHelper.sortNodeList(dead, sorterField, sorterOrder);
             for (int i = 0; i < dead.size(); i++) {
               generateNodeData(out, dead.get(i), port_suffix, false,
-                  nnHttpPort, nnaddr);
+                  nnInfoPort, nnaddr, request.getScheme());
             }
 
             out.print("</table>\n");
@@ -995,7 +999,7 @@ class NamenodeJspHelper {
             JspHelper.sortNodeList(decommissioning, "name", "ASC");
             for (int i = 0; i < decommissioning.size(); i++) {
               generateDecommissioningNodeData(out, decommissioning.get(i),
-                  port_suffix, true, nnHttpPort, nnaddr);
+                  port_suffix, true, nnInfoPort, nnaddr, request.getScheme());
             }
             out.print("</table>\n");
           }
@@ -1023,14 +1027,16 @@ class NamenodeJspHelper {
     final BlockManager blockManager;
     
     XMLBlockInfo(FSNamesystem fsn, Long blockId) {
-      this.blockManager = fsn.getBlockManager();
+      this.blockManager = fsn != null ? fsn.getBlockManager() : null;
 
       if (blockId == null) {
         this.block = null;
         this.inode = null;
       } else {
         this.block = new Block(blockId);
-        this.inode = ((INode)blockManager.getBlockCollection(block)).asFile();
+        this.inode = blockManager != null ?
+          ((INode)blockManager.getBlockCollection(block)).asFile() :
+          null;
       }
     }
 
@@ -1104,7 +1110,9 @@ class NamenodeJspHelper {
         } 
 
         doc.startTag("replicas");
-        for(DatanodeStorageInfo storage : blockManager.getStorages(block)) {
+        for(DatanodeStorageInfo storage : (blockManager != null ?
+                blockManager.getStorages(block) :
+                Collections.<DatanodeStorageInfo>emptyList())) {
           doc.startTag("replica");
 
           DatanodeDescriptor dd = storage.getDatanodeDescriptor();
@@ -1140,7 +1148,7 @@ class NamenodeJspHelper {
     
     XMLCorruptBlockInfo(FSNamesystem fsn, Configuration conf,
                                int numCorruptBlocks, Long startingBlockId) {
-      this.blockManager = fsn.getBlockManager();
+      this.blockManager = fsn != null ? fsn.getBlockManager() : null;
       this.conf = conf;
       this.numCorruptBlocks = numCorruptBlocks;
       this.startingBlockId = startingBlockId;
@@ -1163,16 +1171,19 @@ class NamenodeJspHelper {
       doc.endTag();
       
       doc.startTag("num_missing_blocks");
-      doc.pcdata(""+blockManager.getMissingBlocksCount());
+      doc.pcdata("" + (blockManager != null ?
+        blockManager.getMissingBlocksCount() : 0));
       doc.endTag();
       
       doc.startTag("num_corrupt_replica_blocks");
-      doc.pcdata(""+blockManager.getCorruptReplicaBlocksCount());
+      doc.pcdata("" + (blockManager != null ?
+        blockManager.getCorruptReplicaBlocksCount() : 0));
       doc.endTag();
      
       doc.startTag("corrupt_replica_block_ids");
-      final long[] corruptBlockIds = blockManager.getCorruptReplicaBlockIds(
-          numCorruptBlocks, startingBlockId);
+      final long[] corruptBlockIds = blockManager != null ?
+        blockManager.getCorruptReplicaBlockIds(numCorruptBlocks,
+        startingBlockId) : null;
       if (corruptBlockIds != null) {
         for (Long blockId: corruptBlockIds) {
           doc.startTag("block_id");

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java Wed Oct 16 21:07:28 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ipc.StandbyException;
@@ -43,4 +44,6 @@ public interface Namesystem extends RwLo
   public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
 
   public void checkOperation(OperationCategory read) throws StandbyException;
+
+  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java Wed Oct 16 21:07:28 2013
@@ -33,10 +33,7 @@ import org.apache.hadoop.classification.
 public class SafeModeException extends IOException {
   private static final long serialVersionUID = 1L;
 
-  public SafeModeException() {}
-
   public SafeModeException(String text, FSNamesystem.SafeModeInfo mode ) {
     super(text + ". Name node is in safe mode.\n" + mode.getTurnOffTip());
   }
-
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java Wed Oct 16 21:07:28 2013
@@ -44,6 +44,7 @@ public class StartupProgressServlet exte
   private static final String ELAPSED_TIME = "elapsedTime";
   private static final String FILE = "file";
   private static final String NAME = "name";
+  private static final String DESC = "desc";
   private static final String PERCENT_COMPLETE = "percentComplete";
   private static final String PHASES = "phases";
   private static final String SIZE = "size";
@@ -70,6 +71,7 @@ public class StartupProgressServlet exte
       for (Phase phase: view.getPhases()) {
         json.writeStartObject();
         json.writeStringField(NAME, phase.getName());
+        json.writeStringField(DESC, phase.getDescription());
         json.writeStringField(STATUS, view.getStatus(phase).toString());
         json.writeNumberField(PERCENT_COMPLETE, view.getPercentComplete(phase));
         json.writeNumberField(ELAPSED_TIME, view.getElapsedTime(phase));
@@ -80,8 +82,10 @@ public class StartupProgressServlet exte
         for (Step step: view.getSteps(phase)) {
           json.writeStartObject();
           StepType type = step.getType();
-          String name = type != null ? type.getName() : null;
-          writeStringFieldIfNotNull(json, NAME, name);
+          if (type != null) {
+            json.writeStringField(NAME, type.getName());
+            json.writeStringField(DESC, type.getDescription());
+          }
           json.writeNumberField(COUNT, view.getCount(phase, step));
           writeStringFieldIfNotNull(json, FILE, step.getFile());
           writeNumberFieldIfDefined(json, SIZE, step.getSize());

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Wed Oct 16 21:07:28 2013
@@ -132,7 +132,17 @@ public interface FSNamesystemMBean {
   public int getNumDecomDeadDataNodes();
 
   /**
+   * Number of data nodes that are in the decommissioning state
+   */
+  public int getNumDecommissioningDataNodes();
+
+  /**
    * The statistics of snapshots
    */
   public String getSnapshotStats();
+
+  /**
+   * Return the maximum number of inodes in the file system
+   */
+  public long getMaxObjects();
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java Wed Oct 16 21:07:28 2013
@@ -37,7 +37,12 @@ public class DisallowedDatanodeException
   /** for java.io.Serializable */
   private static final long serialVersionUID = 1L;
 
+  public DisallowedDatanodeException(DatanodeID nodeID, String reason) {
+    super("Datanode denied communication with namenode because "
+        + reason + ": " + nodeID);
+  }
+
   public DisallowedDatanodeException(DatanodeID nodeID) {
-    super("Datanode denied communication with namenode: " + nodeID);
+    this(nodeID, "the host is not in the include-list");
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java Wed Oct 16 21:07:28 2013
@@ -87,7 +87,7 @@ public class LightWeightHashSet<T> imple
    *
    * @see ConcurrentModificationException
    */
-  protected volatile int modification = 0;
+  protected int modification = 0;
 
   private float maxLoadFactor;
   private float minLoadFactor;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Wed Oct 16 21:07:28 2013
@@ -17,29 +17,11 @@
  */
 package org.apache.hadoop.hdfs.web;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
@@ -50,6 +32,11 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.util.StringUtils;
 import org.mortbay.util.ajax.JSON;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.*;
+
 /** JSON Utilities */
 public class JsonUtil {
   private static final Object[] EMPTY_OBJECT_ARRAY = {};
@@ -296,6 +283,7 @@ public class JsonUtil {
     m.put("storageID", datanodeinfo.getDatanodeUuid());
     m.put("xferPort", datanodeinfo.getXferPort());
     m.put("infoPort", datanodeinfo.getInfoPort());
+    m.put("infoSecurePort", datanodeinfo.getInfoSecurePort());
     m.put("ipcPort", datanodeinfo.getIpcPort());
 
     m.put("capacity", datanodeinfo.getCapacity());
@@ -322,6 +310,7 @@ public class JsonUtil {
         (String)m.get("storageID"),
         (int)(long)(Long)m.get("xferPort"),
         (int)(long)(Long)m.get("infoPort"),
+        (int)(long)(Long)m.get("infoSecurePort"),
         (int)(long)(Long)m.get("ipcPort"),
 
         (Long)m.get("capacity"),

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Wed Oct 16 21:07:28 2013
@@ -55,8 +55,9 @@ message DatanodeIDProto {
                                         // as the original StorageID of the
                                         // Datanode.
   required uint32 xferPort = 4;  // data streaming port
-  required uint32 infoPort = 5;  // info server port
+  required uint32 infoPort = 5;  // datanode http port
   required uint32 ipcPort = 6;   // ipc server port
+  optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
 }
 
 /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Oct 16 21:07:28 2013
@@ -570,6 +570,22 @@
 </property>
 
 <property>
+  <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
+  <value>true</value>
+  <description>
+    If true (the default), then the namenode requires that a connecting
+    datanode's address must be resolved to a hostname.  If necessary, a reverse
+    DNS lookup is performed.  All attempts to register a datanode from an
+    unresolvable address are rejected.
+
+    It is recommended that this setting be left on to prevent accidental
+    registration of datanodes listed by hostname in the excludes file during a
+    DNS outage.  Only set this to false in environments where there is no
+    infrastructure to support reverse DNS lookup.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.decommission.interval</name>
   <value>30</value>
   <description>Namenode periodicity in seconds to check if decommission is 

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1527684-1532876

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp Wed Oct 16 21:07:28 2013
@@ -25,6 +25,7 @@
 	import="org.apache.hadoop.fs.Path"
 	import="org.apache.hadoop.ha.HAServiceProtocol.HAServiceState"
 	import="java.util.Collection"
+	import="java.util.Collections"
 	import="java.util.Arrays" %>
 <%!//for java.io.Serializable
   private static final long serialVersionUID = 1L;%>
@@ -34,9 +35,10 @@
   HAServiceState nnHAState = nn.getServiceState();
   boolean isActive = (nnHAState == HAServiceState.ACTIVE);
   String namenodeRole = nn.getRole().toString();
-  String namenodeLabel = nn.getNameNodeAddressHostPortString();
-  Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = 
-	fsn.listCorruptFileBlocks("/", null);
+  String namenodeLabel = NamenodeJspHelper.getNameNodeLabel(nn);
+  Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = fsn != null ?
+    fsn.listCorruptFileBlocks("/", null) :
+    Collections.<FSNamesystem.CorruptFileBlockInfo>emptyList();
   int corruptFileCount = corruptFileBlocks.size();
 %>
 
@@ -48,7 +50,7 @@
 <h1><%=namenodeRole%> '<%=namenodeLabel%>'</h1>
 <%=NamenodeJspHelper.getVersionTable(fsn)%>
 <br>
-<% if (isActive) { %> 
+<% if (isActive && fsn != null) { %> 
   <b><a href="/nn_browsedfscontent.jsp">Browse the filesystem</a></b>
   <br>
 <% } %> 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Wed Oct 16 21:07:28 2013
@@ -34,29 +34,20 @@
   boolean isActive = (nnHAState == HAServiceState.ACTIVE);
   String namenodeRole = nn.getRole().toString();
   String namenodeState = nnHAState.toString();
-  String namenodeLabel = nn.getRpcServer() != null ?
-    nn.getNameNodeAddressHostPortString() : null;
+  String namenodeLabel = NamenodeJspHelper.getNameNodeLabel(nn);
 %>
 
 <!DOCTYPE html>
 <html>
 <head>
 <link rel="stylesheet" type="text/css" href="/static/hadoop.css">
-<% if (namenodeLabel != null) { %>
 <title>Hadoop <%=namenodeRole%>&nbsp;<%=namenodeLabel%></title>
-<% } else { %>
-<title>Hadoop <%=namenodeRole%></title>
-<% } %>
 </head>    
 <body>
-<% if (namenodeLabel != null) { %>
 <h1><%=namenodeRole%> '<%=namenodeLabel%>' (<%=namenodeState%>)</h1>
-<% } else { %>
-<h1><%=namenodeRole%> (<%=namenodeState%>)</h1>
-<% } %>
 <%= NamenodeJspHelper.getVersionTable(fsn) %>
 <br />
-<% if (isActive) { %> 
+<% if (isActive && fsn != null) { %> 
   <b><a href="/nn_browsedfscontent.jsp">Browse the filesystem</a></b><br>
 <% } %> 
 <b><a href="/logs/"><%=namenodeRole%> Logs</a></b>

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp Wed Oct 16 21:07:28 2013
@@ -33,7 +33,7 @@ String namenodeRole = nn.getRole().toStr
 FSNamesystem fsn = nn.getNamesystem();
 HAServiceState nnHAState = nn.getServiceState();
 boolean isActive = (nnHAState == HAServiceState.ACTIVE);
-String namenodeLabel = nn.getNameNodeAddressHostPortString();
+String namenodeLabel = NamenodeJspHelper.getNameNodeLabel(nn);
 %>
 
 <!DOCTYPE html>
@@ -46,7 +46,7 @@ String namenodeLabel = nn.getNameNodeAdd
 <h1><%=namenodeRole%> '<%=namenodeLabel%>'</h1>
 <%= NamenodeJspHelper.getVersionTable(fsn) %>
 <br />
-<% if (isActive) { %> 
+<% if (isActive && fsn != null) { %> 
   <b><a href="/nn_browsedfscontent.jsp">Browse the filesystem</a></b><br>
 <% } %> 
 <b><a href="/logs/"><%=namenodeRole%> Logs</a></b><br>

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Oct 16 21:07:28 2013
@@ -18,40 +18,12 @@
 
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.TimeoutException;
-
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Options.Rename;
@@ -59,13 +31,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -89,8 +56,15 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
+import java.io.*;
+import java.net.*;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static org.junit.Assert.assertEquals;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -110,10 +84,10 @@ public class DFSTestUtil {
   
   /** Creates a new instance of DFSTestUtil
    *
-   * @param testName Name of the test from where this utility is used
    * @param nFiles Number of files to be created
    * @param maxLevels Maximum number of directory levels
    * @param maxSize Maximum size for file
+   * @param minSize Minimum size for file
    */
   private DFSTestUtil(int nFiles, int maxLevels, int maxSize, int minSize) {
     this.nFiles = nFiles;
@@ -139,7 +113,7 @@ public class DFSTestUtil {
   }
   
   /**
-   * when formating a namenode - we must provide clusterid.
+   * when formatting a namenode - we must provide clusterid.
    * @param conf
    * @throws IOException
    */
@@ -803,6 +777,7 @@ public class DFSTestUtil {
         UUID.randomUUID().toString(),
         DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+        DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
   }
 
@@ -812,7 +787,8 @@ public class DFSTestUtil {
 
   public static DatanodeID getLocalDatanodeID(int port) {
     return new DatanodeID("127.0.0.1", "localhost",
-        UUID.randomUUID().toString(), port, port, port);
+        UUID.randomUUID().toString(),
+        port, port, port, port);
   }
 
   public static DatanodeDescriptor getLocalDatanodeDescriptor() {
@@ -836,6 +812,7 @@ public class DFSTestUtil {
     return new DatanodeInfo(new DatanodeID(ipAddr, host,
         UUID.randomUUID().toString(), port,
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+        DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT));
   }
 
@@ -844,6 +821,7 @@ public class DFSTestUtil {
     return new DatanodeInfo(ipAddr, hostname, "",
         DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+        DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
         1, 2, 3, 4, 5, 6, "local", adminState);
   }
@@ -892,6 +870,7 @@ public class DFSTestUtil {
     DatanodeID dnId = new DatanodeID(ipAddr, "host",
         UUID.randomUUID().toString(), port,
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+        DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
     return new DatanodeDescriptor(dnId, rackLocation);
   }
@@ -1055,4 +1034,8 @@ public class DFSTestUtil {
           cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
     } while (locatedBlocks.isUnderConstruction());
   }
+
+  public static void abortStream(DFSOutputStream out) throws IOException {
+    out.abort();
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java Wed Oct 16 21:07:28 2013
@@ -17,13 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import java.net.InetSocketAddress;
-import java.security.Permission;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +33,13 @@ import org.apache.hadoop.test.GenericTes
 import org.apache.hadoop.util.VersionInfo;
 import org.junit.Test;
 
+import java.net.InetSocketAddress;
+import java.security.Permission;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 /**
  * This class tests data node registration.
  */
@@ -157,7 +157,8 @@ public class TestDatanodeRegistration {
     final String DN_HOSTNAME = "localhost";
     final int DN_XFER_PORT = 12345;
     final int DN_INFO_PORT = 12346;
-    final int DN_IPC_PORT = 12347;
+    final int DN_INFO_SECURE_PORT = 12347;
+    final int DN_IPC_PORT = 12348;
     Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     try {
@@ -172,7 +173,8 @@ public class TestDatanodeRegistration {
 
       // register a datanode
       DatanodeID dnId = new DatanodeID(DN_IP_ADDR, DN_HOSTNAME,
-          "fake-datanode-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
+          "fake-datanode-id", DN_XFER_PORT, DN_INFO_PORT, DN_INFO_SECURE_PORT,
+          DN_IPC_PORT);
       long nnCTime = cluster.getNamesystem().getFSImage().getStorage()
           .getCTime();
       StorageInfo mockStorageInfo = mock(StorageInfo.class);
@@ -188,7 +190,8 @@ public class TestDatanodeRegistration {
 
       // register the same datanode again with a different storage ID
       dnId = new DatanodeID(DN_IP_ADDR, DN_HOSTNAME,
-          "changed-fake-datanode-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
+          "changed-fake-datanode-id", DN_XFER_PORT, DN_INFO_PORT,
+          DN_INFO_SECURE_PORT, DN_IPC_PORT);
       dnReg = new DatanodeRegistration(dnId,
           mockStorageInfo, null, VersionInfo.getVersion());
       rpcServer.registerDatanode(dnReg);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java Wed Oct 16 21:07:28 2013
@@ -370,13 +370,20 @@ public class TestDecommission {
       for (int i = 0; i < numNamenodes; i++) {
         ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
         FileSystem fileSys = cluster.getFileSystem(i);
+        FSNamesystem ns = cluster.getNamesystem(i);
+
         writeFile(fileSys, file1, replicas);
-        
+
+        int deadDecomissioned = ns.getNumDecomDeadDataNodes();
+        int liveDecomissioned = ns.getNumDecomLiveDataNodes();
+
         // Decommission one node. Verify that node is decommissioned.
         DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
             AdminStates.DECOMMISSIONED);
         decommissionedNodes.add(decomNode);
-        
+        assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
+        assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
+
         // Ensure decommissioned datanode is not automatically shutdown
         DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
         assertEquals("All datanodes must be alive", numDatanodes, 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Wed Oct 16 21:07:28 2013
@@ -95,6 +95,19 @@ public class TestDistributedFileSystem {
   }
 
   @Test
+  public void testEmptyDelegationToken() throws IOException {
+    Configuration conf = getTestConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      FileSystem fileSys = cluster.getFileSystem();
+      fileSys.getDelegationToken("");
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
   public void testFileSystemCloseAll() throws Exception {
     Configuration conf = getTestConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();