You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/10/17 04:14:36 UTC

svn commit: r1532952 [3/6] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java...

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu Oct 17 02:14:33 2013
@@ -18,10 +18,9 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.net.URL;
 import java.net.URLEncoder;
 import java.security.PrivilegedExceptionAction;
@@ -37,9 +36,9 @@ import javax.servlet.jsp.JspWriter;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -47,20 +46,23 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.http.HtmlQuoting;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
 @InterfaceAudience.Private
 public class DatanodeJspHelper {
+  private static final int PREV_BLOCK = -1;
+  private static final int NEXT_BLOCK = 1;
+
   private static DFSClient getDFSClient(final UserGroupInformation user,
                                         final String addr,
                                         final Configuration conf
@@ -143,10 +145,10 @@ public class DatanodeJspHelper {
           out.print("Empty file");
         } else {
           DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf);
-          String fqdn = canonicalize(chosenNode.getIpAddr());
           int datanodePort = chosenNode.getXferPort();
-          String redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":"
-              + chosenNode.getInfoPort() + "/browseBlock.jsp?blockId="
+          String redirectLocation = JspHelper.Url.url(req.getScheme(),
+              chosenNode)
+              + "/browseBlock.jsp?blockId="
               + firstBlock.getBlock().getBlockId() + "&blockSize="
               + firstBlock.getBlock().getNumBytes() + "&genstamp="
               + firstBlock.getBlock().getGenerationStamp() + "&filename="
@@ -225,7 +227,7 @@ public class DatanodeJspHelper {
         JspHelper.addTableFooter(out);
       }
     }
-    out.print("<br><a href=\"" + HttpConfig.getSchemePrefix()
+    out.print("<br><a href=\"///"
         + canonicalize(nnAddr) + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
     dfs.close();
@@ -302,8 +304,7 @@ public class DatanodeJspHelper {
         Long.MAX_VALUE).getLocatedBlocks();
     // Add the various links for looking at the file contents
     // URL for downloading the full file
-    String downloadUrl = HttpConfig.getSchemePrefix() + req.getServerName() + ":"
-        + req.getServerPort() + "/streamFile" + ServletUtil.encodePath(filename)
+    String downloadUrl = "/streamFile" + ServletUtil.encodePath(filename)
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr, true)
         + JspHelper.getDelegationTokenUrlParam(tokenString);
     out.print("<a name=\"viewOptions\"></a>");
@@ -319,8 +320,8 @@ public class DatanodeJspHelper {
       dfs.close();
       return;
     }
-    String fqdn = canonicalize(chosenNode.getIpAddr());
-    String tailUrl = HttpConfig.getSchemePrefix() + fqdn + ":" + chosenNode.getInfoPort()
+
+    String tailUrl = "///" + JspHelper.Url.authority(req.getScheme(), chosenNode)
         + "/tail.jsp?filename=" + URLEncoder.encode(filename, "UTF-8")
         + "&namenodeInfoPort=" + namenodeInfoPort
         + "&chunkSizeToView=" + chunkSizeToView
@@ -368,8 +369,7 @@ public class DatanodeJspHelper {
       for (int j = 0; j < locs.length; j++) {
         String datanodeAddr = locs[j].getXferAddr();
         datanodePort = locs[j].getXferPort();
-        fqdn = canonicalize(locs[j].getIpAddr());
-        String blockUrl = HttpConfig.getSchemePrefix() + fqdn + ":" + locs[j].getInfoPort()
+        String blockUrl = "///" + JspHelper.Url.authority(req.getScheme(), locs[j])
             + "/browseBlock.jsp?blockId=" + blockidstring
             + "&blockSize=" + blockSize
             + "&filename=" + URLEncoder.encode(filename, "UTF-8")
@@ -380,7 +380,7 @@ public class DatanodeJspHelper {
             + JspHelper.getDelegationTokenUrlParam(tokenString)
             + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
 
-        String blockInfoUrl = HttpConfig.getSchemePrefix() + nnCanonicalName + ":"
+        String blockInfoUrl = "///" + nnCanonicalName + ":"
             + namenodeInfoPort
             + "/block_info_xml.jsp?blockId=" + blockidstring;
         out.print("<td>&nbsp</td><td><a href=\"" + blockUrl + "\">"
@@ -391,7 +391,7 @@ public class DatanodeJspHelper {
     }
     out.println("</table>");
     out.print("<hr>");
-    out.print("<br><a href=\"" + HttpConfig.getSchemePrefix()
+    out.print("<br><a href=\"///"
         + nnCanonicalName + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
     dfs.close();
@@ -491,9 +491,7 @@ public class DatanodeJspHelper {
     String parent = new File(filename).getParent();
     JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, parent, nnAddr);
     out.print("<hr>");
-    out.print("<a href=\"" + HttpConfig.getSchemePrefix()
-        + req.getServerName() + ":" + req.getServerPort()
-        + "/browseDirectory.jsp?dir=" + URLEncoder.encode(parent, "UTF-8")
+    out.print("<a href=\"/browseDirectory.jsp?dir=" + URLEncoder.encode(parent, "UTF-8")
         + "&namenodeInfoPort=" + namenodeInfoPort
         + JspHelper.getDelegationTokenUrlParam(tokenString)
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr)
@@ -501,112 +499,23 @@ public class DatanodeJspHelper {
     out.print("<a href=\"#viewOptions\">Advanced view/download options</a><br>");
     out.print("<hr>");
 
-    // Determine the prev & next blocks
-    long nextStartOffset = 0;
-    long nextBlockSize = 0;
-    String nextBlockIdStr = null;
-    String nextGenStamp = null;
-    String nextHost = req.getServerName();
-    int nextPort = req.getServerPort();
-    int nextDatanodePort = datanodePort;
-    // determine data for the next link
-    if (startOffset + chunkSizeToView >= blockSize) {
-      // we have to go to the next block from this point onwards
-      List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
-          Long.MAX_VALUE).getLocatedBlocks();
-      for (int i = 0; i < blocks.size(); i++) {
-        if (blocks.get(i).getBlock().getBlockId() == blockId) {
-          if (i != blocks.size() - 1) {
-            LocatedBlock nextBlock = blocks.get(i + 1);
-            nextBlockIdStr = Long.toString(nextBlock.getBlock().getBlockId());
-            nextGenStamp = Long.toString(nextBlock.getBlock()
-                .getGenerationStamp());
-            nextStartOffset = 0;
-            nextBlockSize = nextBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(nextBlock, conf);
-            nextDatanodePort = d.getXferPort();
-            nextHost = d.getIpAddr();
-            nextPort = d.getInfoPort();
-          }
-        }
-      }
-    } else {
-      // we are in the same block
-      nextBlockIdStr = blockId.toString();
-      nextStartOffset = startOffset + chunkSizeToView;
-      nextBlockSize = blockSize;
-      nextGenStamp = genStamp.toString();
-    }
-    String nextUrl = null;
-    if (nextBlockIdStr != null) {
-      nextUrl = HttpConfig.getSchemePrefix() + canonicalize(nextHost) + ":" + nextPort
-          + "/browseBlock.jsp?blockId=" + nextBlockIdStr
-          + "&blockSize=" + nextBlockSize
-          + "&startOffset=" + nextStartOffset
-          + "&genstamp=" + nextGenStamp
-          + "&filename=" + URLEncoder.encode(filename, "UTF-8")
-          + "&chunkSizeToView=" + chunkSizeToView
-          + "&datanodePort=" + nextDatanodePort
-          + "&namenodeInfoPort=" + namenodeInfoPort
-          + JspHelper.getDelegationTokenUrlParam(tokenString)
-          + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
+    String authority = req.getServerName() + ":" + req.getServerPort();
+    String nextUrl = generateLinksForAdjacentBlock(NEXT_BLOCK, authority,
+        datanodePort, startOffset, chunkSizeToView, blockSize, blockId,
+        genStamp, dfs, filename, conf, req.getScheme(), tokenString,
+        namenodeInfoPort, nnAddr);
+    if (nextUrl != null) {
       out.print("<a href=\"" + nextUrl + "\">View Next chunk</a>&nbsp;&nbsp;");
     }
-    // determine data for the prev link
-    String prevBlockIdStr = null;
-    String prevGenStamp = null;
-    long prevStartOffset = 0;
-    long prevBlockSize = 0;
-    String prevHost = req.getServerName();
-    int prevPort = req.getServerPort();
-    int prevDatanodePort = datanodePort;
-    if (startOffset == 0) {
-      List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
-          Long.MAX_VALUE).getLocatedBlocks();
-      for (int i = 0; i < blocks.size(); i++) {
-        if (blocks.get(i).getBlock().getBlockId() == blockId) {
-          if (i != 0) {
-            LocatedBlock prevBlock = blocks.get(i - 1);
-            prevBlockIdStr = Long.toString(prevBlock.getBlock().getBlockId());
-            prevGenStamp = Long.toString(prevBlock.getBlock()
-                .getGenerationStamp());
-            prevStartOffset = prevBlock.getBlock().getNumBytes()
-                - chunkSizeToView;
-            if (prevStartOffset < 0)
-              prevStartOffset = 0;
-            prevBlockSize = prevBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(prevBlock, conf);
-            prevDatanodePort = d.getXferPort();
-            prevHost = d.getIpAddr();
-            prevPort = d.getInfoPort();
-          }
-        }
-      }
-    } else {
-      // we are in the same block
-      prevBlockIdStr = blockId.toString();
-      prevStartOffset = startOffset - chunkSizeToView;
-      if (prevStartOffset < 0)
-        prevStartOffset = 0;
-      prevBlockSize = blockSize;
-      prevGenStamp = genStamp.toString();
-    }
 
-    String prevUrl = null;
-    if (prevBlockIdStr != null) {
-      prevUrl = HttpConfig.getSchemePrefix() + canonicalize(prevHost) + ":" + prevPort
-          + "/browseBlock.jsp?blockId=" + prevBlockIdStr
-          + "&blockSize=" + prevBlockSize
-          + "&startOffset=" + prevStartOffset
-          + "&filename=" + URLEncoder.encode(filename, "UTF-8")
-          + "&chunkSizeToView=" + chunkSizeToView
-          + "&genstamp=" + prevGenStamp
-          + "&datanodePort=" + prevDatanodePort
-          + "&namenodeInfoPort=" + namenodeInfoPort
-          + JspHelper.getDelegationTokenUrlParam(tokenString)
-          + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
+    String prevUrl = generateLinksForAdjacentBlock(PREV_BLOCK, authority,
+        datanodePort, startOffset, chunkSizeToView, blockSize, blockId,
+        genStamp, dfs, filename, conf, req.getScheme(), tokenString,
+        namenodeInfoPort, nnAddr);
+    if (prevUrl != null) {
       out.print("<a href=\"" + prevUrl + "\">View Prev chunk</a>&nbsp;&nbsp;");
     }
+
     out.print("<hr>");
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     try {
@@ -621,6 +530,71 @@ public class DatanodeJspHelper {
     dfs.close();
   }
 
+  private static String generateLinksForAdjacentBlock(final int direction,
+      String authority, int datanodePort, long startOffset,
+      int chunkSizeToView, long blockSize, long blockId, Long genStamp,
+      final DFSClient dfs, final String filename, final Configuration conf,
+      final String scheme, final String tokenString,
+      final int namenodeInfoPort, final String nnAddr)
+      throws AccessControlException, FileNotFoundException,
+      UnresolvedLinkException, IOException {
+
+    boolean found = false;
+    if ((direction == NEXT_BLOCK && startOffset + chunkSizeToView < blockSize)
+        || (direction == PREV_BLOCK && startOffset != 0)) {
+      // we are in the same block
+      found = true;
+
+      if (direction == NEXT_BLOCK) {
+        startOffset = startOffset + chunkSizeToView;
+      } else {
+        startOffset = Math.max(0, startOffset - chunkSizeToView);
+      }
+    } else {
+      List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
+          Long.MAX_VALUE).getLocatedBlocks();
+
+      final long curBlockId = blockId;
+      int curBlockIdx = Iterables.indexOf(blocks, new Predicate<LocatedBlock>() {
+        @Override
+        public boolean apply(LocatedBlock b) {
+          return b.getBlock().getBlockId() == curBlockId;
+        }
+      });
+      found = curBlockIdx != -1 &&
+          ((direction == NEXT_BLOCK && curBlockIdx < blocks.size() - 1)
+              || (direction == PREV_BLOCK && curBlockIdx > 0));
+
+      if (found) {
+        LocatedBlock nextBlock = blocks.get(curBlockIdx + direction);
+
+        blockId = nextBlock.getBlock().getBlockId();
+        genStamp = nextBlock.getBlock().getGenerationStamp();
+        startOffset = 0;
+        blockSize = nextBlock.getBlock().getNumBytes();
+        DatanodeInfo d = JspHelper.bestNode(nextBlock, conf);
+        datanodePort = d.getXferPort();
+        authority = JspHelper.Url.authority(scheme, d);
+      }
+    }
+
+    if (found) {
+      return "///" + authority
+          + "/browseBlock.jsp?blockId=" + blockId
+          + "&blockSize=" + blockSize
+          + "&startOffset=" + startOffset
+          + "&genstamp=" + genStamp
+          + "&filename=" + URLEncoder.encode(filename, "UTF-8")
+          + "&chunkSizeToView=" + chunkSizeToView
+          + "&datanodePort=" + datanodePort
+          + "&namenodeInfoPort=" + namenodeInfoPort
+          + JspHelper.getDelegationTokenUrlParam(tokenString)
+          + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
+    } else {
+      return null;
+    }
+  }
+
   static void generateFileChunksForTail(JspWriter out, HttpServletRequest req,
                                         Configuration conf
                                         ) throws IOException,

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java Thu Oct 17 02:14:33 2013
@@ -33,6 +33,12 @@ public interface RollingLogs {
   public interface LineIterator extends Iterator<String>, Closeable {
     /** Is the iterator iterating the previous? */
     public boolean isPrevious();
+
+    /**
+     * Is the last read entry from previous? This should be called after
+     * reading.
+     */
+    public boolean isLastReadFromPrevious();
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java Thu Oct 17 02:14:33 2013
@@ -134,6 +134,7 @@ class RollingLogsImpl implements Rolling
    */
   private class Reader implements RollingLogs.LineIterator {
     private File file;
+    private File lastReadFile;
     private BufferedReader reader;
     private String line;
     private boolean closed = false;
@@ -149,6 +150,11 @@ class RollingLogsImpl implements Rolling
       return file == prev;
     }
 
+    @Override
+    public boolean isLastReadFromPrevious() {
+      return lastReadFile == prev;
+    }
+
     private boolean openFile() throws IOException {
 
       for(int i=0; i<2; i++) {
@@ -203,6 +209,7 @@ class RollingLogsImpl implements Rolling
     public String next() {
       String curLine = line;
       try {
+        lastReadFile = file;
         readNext();
       } catch (IOException e) {
         DataBlockScanner.LOG.warn("Failed to read next line.", e);

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Thu Oct 17 02:14:33 2013
@@ -827,7 +827,7 @@ class ClusterJspHelper {
     doc.startTag("item");
     doc.attribute("label", label);
     doc.attribute("value", value);
-    doc.attribute("link", HttpConfig.getSchemePrefix() + url);
+    doc.attribute("link", "///" + url);
     doc.endTag(); // item
   }
 
@@ -887,7 +887,16 @@ class ClusterJspHelper {
 
   private static String queryMbean(String httpAddress, Configuration conf) 
     throws IOException {
-    URL url = new URL(HttpConfig.getSchemePrefix() + httpAddress+JMX_QRY);
+    /**
+     * Although the other namenode might support HTTPS, it is fundamentally
+     * broken to get the JMX via an HTTPS connection inside the namenode,
+     * because in HTTPS set up the principal of the client and the one of
+     * the namenode differs. Therefore, there is no guarantees that the
+     * HTTPS connection can be set up.
+     *
+     * As a result, we just hard code the connection as an HTTP connection.
+     */
+    URL url = new URL("http://" + httpAddress + JMX_QRY);
     return readOutput(url);
   }
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Oct 17 02:14:33 2013
@@ -2600,10 +2600,12 @@ public class FSDirectory implements Clos
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.getId(), loc, childrenNum);
-        // Set caching information for the located blocks.
-    CacheManager cacheManager = namesystem.getCacheManager();
-    for (LocatedBlock lb: loc.getLocatedBlocks()) {
-      cacheManager.setCachedLocations(lb);
+    // Set caching information for the located blocks.
+    if (loc != null) {
+      CacheManager cacheManager = namesystem.getCacheManager();
+      for (LocatedBlock lb: loc.getLocatedBlocks()) {
+        cacheManager.setCachedLocations(lb);
+      }
     }
     return status;
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Oct 17 02:14:33 2013
@@ -124,6 +124,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -168,7 +169,14 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
+import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@@ -181,12 +189,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
-import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@@ -198,6 +200,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -210,6 +218,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.RetryCache.CacheEntry;
 import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
@@ -238,6 +247,7 @@ import org.mortbay.util.ajax.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
 /**
@@ -440,7 +450,7 @@ public class FSNamesystem implements Nam
   private final long accessTimePrecision;
 
   /** Lock to protect FSNamesystem. */
-  private ReentrantReadWriteLock fsLock = new ReentrantReadWriteLock(true);
+  private ReentrantReadWriteLock fsLock;
 
   /**
    * Used when this NN is in standby state to read from the shared edit log.
@@ -459,6 +469,11 @@ public class FSNamesystem implements Nam
   private HAContext haContext;
 
   private final boolean haEnabled;
+  
+  /**
+   * Whether the namenode is in the middle of starting the active service
+   */
+  private volatile boolean startingActiveService = false;
     
   private INodeId inodeId;
   
@@ -615,6 +630,9 @@ public class FSNamesystem implements Nam
    */
   FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
       throws IOException {
+    boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
+    LOG.info("fsLock is fair:" + fair);
+    fsLock = new ReentrantReadWriteLock(fair);
     try {
       resourceRecheckInterval = conf.getLong(
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@@ -911,6 +929,7 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   void startActiveServices() throws IOException {
+    startingActiveService = true;
     LOG.info("Starting services required for active state");
     writeLock();
     try {
@@ -967,8 +986,19 @@ public class FSNamesystem implements Nam
       blockManager.getDatanodeManager().setSendCachingCommands(true);
     } finally {
       writeUnlock();
+      startingActiveService = false;
     }
   }
+  
+  /**
+   * @return Whether the namenode is transitioning to active state and is in the
+   *         middle of the {@link #startActiveServices()}
+   */
+  public boolean inTransitionToActive() {
+    return haEnabled && haContext != null
+        && haContext.getState().getServiceState() == HAServiceState.ACTIVE
+        && startingActiveService;
+  }
 
   private boolean shouldUseDelegationTokens() {
     return UserGroupInformation.isSecurityEnabled() ||
@@ -1063,6 +1093,26 @@ public class FSNamesystem implements Nam
     }
   }
   
+  /**
+   * @throws RetriableException
+   *           If 1) The NameNode is in SafeMode, 2) HA is enabled, and 3)
+   *           NameNode is in active state
+   * @throws SafeModeException
+   *           Otherwise if NameNode is in SafeMode.
+   */
+  private void checkNameNodeSafeMode(String errorMsg)
+      throws RetriableException, SafeModeException {
+    if (isInSafeMode()) {
+      SafeModeException se = new SafeModeException(errorMsg, safeMode);
+      if (haEnabled && haContext != null
+          && haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+        throw new RetriableException(se);
+      } else {
+        throw se;
+      }
+    }
+  }
+  
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
     return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
   }
@@ -1364,9 +1414,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set permission for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set permission for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       dir.setPermission(src, permission);
@@ -1403,9 +1451,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set owner for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set owner for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       if (!pc.isSuperUser()) {
@@ -1485,8 +1531,14 @@ public class FSNamesystem implements Nam
       for (LocatedBlock b : ret.getLocatedBlocks()) {
         // if safemode & no block locations yet then throw safemodeException
         if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
-          throw new SafeModeException("Zero blocklocations for " + src,
-              safeMode);
+          SafeModeException se = new SafeModeException(
+              "Zero blocklocations for " + src, safeMode);
+          if (haEnabled && haContext != null && 
+              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
+            throw new RetriableException(se);
+          } else {
+            throw se;
+          }
         }
       }
     }
@@ -1633,9 +1685,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot concat " + target, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot concat " + target);
       concatInternal(pc, target, srcs, logRetryCache);
       resultingStat = getAuditFileInfo(target, false);
     } finally {
@@ -1783,9 +1833,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set times " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set times " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       // Write access is required to set access and modification times
@@ -1812,16 +1860,16 @@ public class FSNamesystem implements Nam
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
-    if (cacheEntry != null && cacheEntry.isSuccess()) {
-      return; // Return previous response
-    }
     if (!DFSUtil.isValidName(link)) {
       throw new InvalidPathException("Invalid link name: " + link);
     }
     if (FSDirectory.isReservedName(target)) {
       throw new InvalidPathException("Invalid target name: " + target);
     }
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     boolean success = false;
     try {
       createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
@@ -1848,9 +1896,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create symlink " + link, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create symlink " + link);
       link = FSDirectory.resolvePath(link, pathComponents, dir);
       if (!createParent) {
         verifyParentDir(link);
@@ -1908,9 +1954,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set replication for " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set replication for " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.WRITE);
@@ -2040,9 +2084,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create file" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       startFileInternal(pc, src, permissions, holder, clientMachine, create,
           overwrite, createParent, replication, blockSize, logRetryCache);
@@ -2261,10 +2303,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-            "Cannot recover the lease of " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot recover the lease of " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
@@ -2415,9 +2454,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot append to file" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot append to file" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
     } catch (StandbyException se) {
@@ -2464,7 +2501,7 @@ public class FSNamesystem implements Nam
    * client to "try again later".
    */
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
-      ExtendedBlock previous, HashMap<Node, Node> excludedNodes, 
+      ExtendedBlock previous, Set<Node> excludedNodes, 
       List<String> favoredNodes)
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
@@ -2567,9 +2604,7 @@ public class FSNamesystem implements Nam
     checkBlock(previous);
     onRetryBlock[0] = null;
     checkOperation(OperationCategory.WRITE);
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot add block to " + src, safeMode);
-    }
+    checkNameNodeSafeMode("Cannot add block to " + src);
 
     // have we exceeded the configured limit of fs objects.
     checkFsObjectLimit();
@@ -2663,7 +2698,7 @@ public class FSNamesystem implements Nam
 
   /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
   LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
-      final DatanodeInfo[] existings,  final HashMap<Node, Node> excludes,
+      final DatanodeInfo[] existings,  final Set<Node> excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
     //check if the feature is enabled
@@ -2678,10 +2713,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.READ);
       //check safe mode
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot add datanode; src=" + src
-            + ", blk=" + blk, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //check lease
@@ -2726,10 +2758,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot abandon block " + b +
-                                    " for fle" + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot abandon block " + b + " for fle" + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
       //
@@ -2812,9 +2841,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot complete file " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot complete file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       success = completeFileInternal(src, holder,
         ExtendedBlock.getLocalBlock(last), fileId);
@@ -2990,9 +3017,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename " + src);
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       checkOperation(OperationCategory.WRITE);
@@ -3042,10 +3067,6 @@ public class FSNamesystem implements Nam
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
-    if (cacheEntry != null && cacheEntry.isSuccess()) {
-      return; // Return previous response
-    }
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -3053,8 +3074,13 @@ public class FSNamesystem implements Nam
     if (!DFSUtil.isValidName(dst)) {
       throw new InvalidPathException("Invalid name: " + dst);
     }
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
+    
     checkOperation(OperationCategory.WRITE);
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
     HdfsFileStatus resultingStat = null;
@@ -3062,9 +3088,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename " + src);
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       renameToInternal(pc, src, dst, cacheEntry != null, options);
@@ -3170,9 +3194,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot delete " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot delete " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (!recursive && dir.isNonEmptyDirectory(src)) {
         throw new IOException(src + " is non empty");
@@ -3391,9 +3413,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);   
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create directory " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create directory " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       status = mkdirsInternal(pc, src, permissions, createParent);
       if (status) {
@@ -3493,9 +3513,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot set quota on " + path, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot set quota on " + path);
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
       writeUnlock();
@@ -3518,9 +3536,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot fsync file " + src, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot fsync file " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
       if (lastBlockLength > 0) {
@@ -3724,6 +3740,39 @@ public class FSNamesystem implements Nam
   BlockInfo getStoredBlock(Block block) {
     return blockManager.getStoredBlock(block);
   }
+  
+  @Override
+  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
+    assert hasReadOrWriteLock();
+    final BlockCollection bc = blockUC.getBlockCollection();
+    if (bc == null || !(bc instanceof INodeFileUnderConstruction)) {
+      return false;
+    }
+
+    INodeFileUnderConstruction inodeUC = (INodeFileUnderConstruction) blockUC
+        .getBlockCollection();
+    String fullName = inodeUC.getName();
+    try {
+      if (fullName != null && fullName.startsWith(Path.SEPARATOR)
+          && dir.getINode(fullName) == inodeUC) {
+        // If file exists in normal path then no need to look in snapshot
+        return false;
+      }
+    } catch (UnresolvedLinkException e) {
+      LOG.error("Error while resolving the link : " + fullName, e);
+      return false;
+    }
+    /*
+     * 1. if bc is an instance of INodeFileUnderConstructionWithSnapshot, and
+     * bc is not in the current fsdirectory tree, bc must represent a snapshot
+     * file. 
+     * 2. if fullName is not an absolute path, bc cannot be existent in the 
+     * current fsdirectory tree. 
+     * 3. if bc is not the current node associated with fullName, bc must be a
+     * snapshot inode.
+     */
+    return true;
+  }
 
   void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
@@ -3745,11 +3794,8 @@ public class FSNamesystem implements Nam
       // If a DN tries to commit to the standby, the recovery will
       // fail, and the next retry will succeed on the new NN.
   
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-          "Cannot commitBlockSynchronization while in safe mode",
-          safeMode);
-      }
+      checkNameNodeSafeMode(
+          "Cannot commitBlockSynchronization while in safe mode");
       final BlockInfo storedBlock = getStoredBlock(
           ExtendedBlock.getLocalBlock(lastblock));
       if (storedBlock == null) {
@@ -3895,9 +3941,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot renew lease for " + holder);
       leaseManager.renewLease(holder);
     } finally {
       writeUnlock();
@@ -3934,11 +3978,27 @@ public class FSNamesystem implements Nam
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    String startAfterString = new String(startAfter);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
 
+      // Get file name when startAfter is an INodePath
+      if (FSDirectory.isReservedName(startAfterString)) {
+        byte[][] startAfterComponents = FSDirectory
+            .getPathComponentsForReservedPath(startAfterString);
+        try {
+          String tmp = FSDirectory.resolvePath(src, startAfterComponents, dir);
+          byte[][] regularPath = INode.getPathComponents(tmp);
+          startAfter = regularPath[regularPath.length - 1];
+        } catch (IOException e) {
+          // Possibly the inode is deleted
+          throw new DirectoryListingStartAfterNotFoundException(
+              "Can't find startAfter " + startAfterString);
+        }
+      }
+      
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
           checkPathAccess(pc, src, FsAction.READ_EXECUTE);
@@ -4220,6 +4280,14 @@ public class FSNamesystem implements Nam
     return this.snapshotManager.getNumSnapshots();
   }
 
+  @Override
+  public String getSnapshotStats() {
+    Map<String, Object> info = new HashMap<String, Object>();
+    info.put("SnapshottableDirectories", this.getNumSnapshottableDirs());
+    info.put("Snapshots", this.getNumSnapshots());
+    return JSON.toString(info);
+  }
+
   int getNumberOfDatanodes(DatanodeReportType type) {
     readLock();
     try {
@@ -4259,19 +4327,20 @@ public class FSNamesystem implements Nam
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    checkOperation(OperationCategory.UNCHECKED);
+    checkSuperuserPrivilege();
+    
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    checkSuperuserPrivilege();
-    checkOperation(OperationCategory.UNCHECKED);
     boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.UNCHECKED);
       if (!isInSafeMode()) {
-        throw new IOException("Safe mode should be turned ON " +
-                              "in order to create namespace image.");
+        throw new IOException("Safe mode should be turned ON "
+            + "in order to create namespace image.");
       }
       getFSImage().saveNamespace(this);
       success = true;
@@ -4348,7 +4417,7 @@ public class FSNamesystem implements Nam
    * replicas, and calculates the ratio of safe blocks to the total number
    * of blocks in the system, which is the size of blocks in
    * {@link FSNamesystem#blockManager}. When the ratio reaches the
-   * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
+   * {@link #threshold} it starts the SafeModeMonitor daemon in order
    * to monitor whether the safe mode {@link #extension} is passed.
    * Then it leaves safe mode and destroys itself.
    * <p>
@@ -4356,10 +4425,9 @@ public class FSNamesystem implements Nam
    * not tracked because the name node is not intended to leave safe mode
    * automatically in the case.
    *
-   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction)
-   * @see SafeModeMonitor
+   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
    */
-  class SafeModeInfo {
+  public class SafeModeInfo {
     // configuration fields
     /** Safe mode threshold condition %.*/
     private double threshold;
@@ -4572,7 +4640,7 @@ public class FSNamesystem implements Nam
      */
     private boolean needEnter() {
       return (threshold != 0 && blockSafe < blockThreshold) ||
-        (getNumLiveDataNodes() < datanodeThreshold) ||
+        (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
         (!nameNodeHasResourcesAvailable());
     }
       
@@ -5101,9 +5169,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Log not rolled", safeMode);
-      }
+      checkNameNodeSafeMode("Log not rolled");
       LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
       return getFSImage().rollEditLog();
     } finally {
@@ -5124,9 +5190,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Checkpoint not started", safeMode);
-      }
+      checkNameNodeSafeMode("Checkpoint not started");
       LOG.info("Start checkpoint for " + backupNode.getAddress());
       cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
       getEditLog().logSync();
@@ -5150,19 +5214,17 @@ public class FSNamesystem implements Nam
   
   void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    checkOperation(OperationCategory.CHECKPOINT);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    checkOperation(OperationCategory.CHECKPOINT);
     boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Checkpoint not ended", safeMode);
-      }
+      checkNameNodeSafeMode("Checkpoint not ended");
       LOG.info("End checkpoint for " + registration.getAddress());
       getFSImage().endCheckpoint(sig);
       success = true;
@@ -5263,7 +5325,8 @@ public class FSNamesystem implements Nam
   /**
    * Get the total number of objects in the system. 
    */
-  long getMaxObjects() {
+  @Override // FSNamesystemMBean
+  public long getMaxObjects() {
     return maxFsObjects;
   }
 
@@ -5408,7 +5471,7 @@ public class FSNamesystem implements Nam
   @Override // FSNamesystemMBean
   public int getNumDecomDeadDataNodes() {
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    getBlockManager().getDatanodeManager().fetchDatanodes(dead, null, true);
+    getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true);
     int deadDecommissioned = 0;
     for (DatanodeDescriptor node : dead) {
       deadDecommissioned += node.isDecommissioned() ? 1 : 0;
@@ -5417,6 +5480,12 @@ public class FSNamesystem implements Nam
   }
 
   @Override // FSNamesystemMBean
+  public int getNumDecommissioningDataNodes() {
+    return getBlockManager().getDatanodeManager().getDecommissioningNodes()
+        .size();
+  }
+
+  @Override // FSNamesystemMBean
   @Metric({"StaleDataNodes", 
     "Number of datanodes marked stale due to delayed heartbeat"})
   public int getNumStaleDataNodes() {
@@ -5514,10 +5583,7 @@ public class FSNamesystem implements Nam
   long nextGenerationStamp(boolean legacyBlock)
       throws IOException, SafeModeException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot get next generation stamp", safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get next generation stamp");
 
     long gs;
     if (legacyBlock) {
@@ -5570,12 +5636,9 @@ public class FSNamesystem implements Nam
   /**
    * Increments, logs and then returns the block ID
    */
-  private long nextBlockId() throws SafeModeException {
+  private long nextBlockId() throws IOException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot get next block ID", safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get next block ID");
     final long blockId = blockIdGenerator.nextValue();
     getEditLog().logAllocateBlockId(blockId);
     // NB: callers sync the log
@@ -5585,10 +5648,8 @@ public class FSNamesystem implements Nam
   private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
       String clientName) throws IOException {
     assert hasWriteLock();
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot get a new generation stamp and an " +
-                                "access token for block " + block, safeMode);
-    }
+    checkNameNodeSafeMode("Cannot get a new generation stamp and an "
+        + "access token for block " + block);
     
     // check stored block state
     BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
@@ -5686,11 +5747,11 @@ public class FSNamesystem implements Nam
   void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
+    checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    checkOperation(OperationCategory.WRITE);
     LOG.info("updatePipeline(block=" + oldBlock
              + ", newGenerationStamp=" + newBlock.getGenerationStamp()
              + ", newLength=" + newBlock.getNumBytes()
@@ -5701,9 +5762,7 @@ public class FSNamesystem implements Nam
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Pipeline not updated", safeMode);
-      }
+      checkNameNodeSafeMode("Pipeline not updated");
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
       updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
@@ -5963,9 +6022,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot issue delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot issue delegation token");
       if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
           "Delegation Token can be issued only with kerberos or web authentication");
@@ -6010,9 +6067,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot renew delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot renew delegation token");
       if (!isAllowedDelegationTokenOp()) {
         throw new IOException(
             "Delegation Token can be renewed only with kerberos or web authentication");
@@ -6043,9 +6098,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
 
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot cancel delegation token", safeMode);
-      }
+      checkNameNodeSafeMode("Cannot cancel delegation token");
       String canceller = getRemoteUser().getUserName();
       DelegationTokenIdentifier id = dtSecretManager
         .cancelToken(token, canceller);
@@ -6271,14 +6324,25 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
     for (DatanodeDescriptor node : live) {
-      final Map<String, Object> innerinfo = new HashMap<String, Object>();
-      innerinfo.put("lastContact", getLastContact(node));
-      innerinfo.put("usedSpace", getDfsUsed(node));
-      innerinfo.put("adminState", node.getAdminState().toString());
-      innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed());
-      innerinfo.put("capacity", node.getCapacity());
-      innerinfo.put("numBlocks", node.numBlocks());
-      innerinfo.put("version", node.getSoftwareVersion());
+      Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
+          .put("infoAddr", node.getInfoAddr())
+          .put("infoSecureAddr", node.getInfoSecureAddr())
+          .put("xferaddr", node.getXferAddr())
+          .put("lastContact", getLastContact(node))
+          .put("usedSpace", getDfsUsed(node))
+          .put("adminState", node.getAdminState().toString())
+          .put("nonDfsUsedSpace", node.getNonDfsUsed())
+          .put("capacity", node.getCapacity())
+          .put("numBlocks", node.numBlocks())
+          .put("version", node.getSoftwareVersion())
+          .put("used", node.getDfsUsed())
+          .put("remaining", node.getRemaining())
+          .put("blockScheduled", node.getBlocksScheduled())
+          .put("blockPoolUsed", node.getBlockPoolUsed())
+          .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
+          .put("volfails", node.getVolumeFailures())
+          .build();
+
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -6295,9 +6359,11 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(null, dead, true);
     for (DatanodeDescriptor node : dead) {
-      final Map<String, Object> innerinfo = new HashMap<String, Object>();
-      innerinfo.put("lastContact", getLastContact(node));
-      innerinfo.put("decommissioned", node.isDecommissioned());
+      Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
+          .put("lastContact", getLastContact(node))
+          .put("decommissioned", node.isDecommissioned())
+          .put("xferaddr", node.getXferAddr())
+          .build();
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -6314,13 +6380,16 @@ public class FSNamesystem implements Nam
     final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager(
         ).getDecommissioningNodes();
     for (DatanodeDescriptor node : decomNodeList) {
-      final Map<String, Object> innerinfo = new HashMap<String, Object>();
-      innerinfo.put("underReplicatedBlocks", node.decommissioningStatus
-          .getUnderReplicatedBlocks());
-      innerinfo.put("decommissionOnlyReplicas", node.decommissioningStatus
-          .getDecommissionOnlyReplicas());
-      innerinfo.put("underReplicateInOpenFiles", node.decommissioningStatus
-          .getUnderReplicatedInOpenFiles());
+      Map<String, Object> innerinfo = ImmutableMap
+          .<String, Object> builder()
+          .put("xferaddr", node.getXferAddr())
+          .put("underReplicatedBlocks",
+              node.decommissioningStatus.getUnderReplicatedBlocks())
+          .put("decommissionOnlyReplicas",
+              node.decommissioningStatus.getDecommissionOnlyReplicas())
+          .put("underReplicateInOpenFiles",
+              node.decommissioningStatus.getUnderReplicatedInOpenFiles())
+          .build();
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);
@@ -6514,11 +6583,17 @@ public class FSNamesystem implements Nam
    * Verifies that the given identifier and password are valid and match.
    * @param identifier Token identifier.
    * @param password Password in the token.
-   * @throws InvalidToken
    */
   public synchronized void verifyToken(DelegationTokenIdentifier identifier,
-      byte[] password) throws InvalidToken {
-    getDelegationTokenSecretManager().verifyToken(identifier, password);
+      byte[] password) throws InvalidToken, RetriableException {
+    try {
+      getDelegationTokenSecretManager().verifyToken(identifier, password);
+    } catch (InvalidToken it) {
+      if (inTransitionToActive()) {
+        throw new RetriableException(it);
+      }
+      throw it;
+    }
   }
   
   @Override
@@ -6536,6 +6611,11 @@ public class FSNamesystem implements Nam
   }
   
   @VisibleForTesting
+  public void setEditLogTailerForTests(EditLogTailer tailer) {
+    this.editLogTailer = tailer;
+  }
+  
+  @VisibleForTesting
   void setFsLockForTests(ReentrantReadWriteLock lock) {
     this.fsLock = lock;
   }
@@ -6570,10 +6650,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot allow snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot allow snapshot for " + path);
       checkSuperuserPrivilege();
 
       dir.writeLock();
@@ -6598,10 +6675,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot disallow snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
       checkSuperuserPrivilege();
 
       dir.writeLock();
@@ -6628,20 +6702,18 @@ public class FSNamesystem implements Nam
    */
   String createSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
         null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (String) cacheEntry.getPayload();
     }
-    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     String snapshotPath = null;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot create snapshot for "
-            + snapshotRoot, safeMode);
-      }
+      checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
       if (isPermissionEnabled) {
         checkOwner(pc, snapshotRoot);
       }
@@ -6680,19 +6752,17 @@ public class FSNamesystem implements Nam
    */
   void renameSnapshot(String path, String snapshotOldName,
       String snapshotNewName) throws SafeModeException, IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
-    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot rename snapshot for " + path,
-            safeMode);
-      }
+      checkNameNodeSafeMode("Cannot rename snapshot for " + path);
       if (isPermissionEnabled) {
         checkOwner(pc, path);
       }
@@ -6725,10 +6795,10 @@ public class FSNamesystem implements Nam
   public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
       throws IOException {
     SnapshottableDirectoryStatus[] status = null;
+    final FSPermissionChecker checker = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      FSPermissionChecker checker = getPermissionChecker();
       final String user = checker.isSuperUser()? null : checker.getUser();
       status = snapshotManager.getSnapshottableDirListing(user);
     } finally {
@@ -6796,21 +6866,21 @@ public class FSNamesystem implements Nam
    */
   void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
+    checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
+    
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
     boolean success = false;
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-            "Cannot delete snapshot for " + snapshotRoot, safeMode);
+      checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
+      if (isPermissionEnabled) {
+        checkOwner(pc, snapshotRoot);
       }
-      checkOwner(pc, snapshotRoot);
 
       BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
       List<INode> removedINodes = new ChunkedArrayList<INode>();
@@ -7113,8 +7183,11 @@ public class FSNamesystem implements Nam
           }
           sb.append(trackingId);
         }
-        auditLog.info(sb);
+        logAuditMessage(sb.toString());
       }
     }
+    public void logAuditMessage(String message) {
+      auditLog.info(message);
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Thu Oct 17 02:14:33 2013
@@ -57,9 +57,14 @@ public class FileChecksumServlets {
       final String hostname = host instanceof DatanodeInfo 
           ? ((DatanodeInfo)host).getHostName() : host.getIpAddr();
       final String scheme = request.getScheme();
-      final int port = "https".equals(scheme)
-          ? (Integer)getServletContext().getAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY)
-          : host.getInfoPort();
+      int port = host.getInfoPort();
+      if ("https".equals(scheme)) {
+        final Integer portObject = (Integer) getServletContext().getAttribute(
+            DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
+        if (portObject != null) {
+          port = portObject;
+        }
+      }
       final String encodedPath = ServletUtil.getRawPath(request, "/fileChecksum");
 
       String dtParam = "";

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Thu Oct 17 02:14:33 2013
@@ -61,9 +61,14 @@ public class FileDataServlet extends Dfs
     } else {
       hostname = host.getIpAddr();
     }
-    final int port = "https".equals(scheme)
-      ? (Integer)getServletContext().getAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY)
-      : host.getInfoPort();
+    int port = host.getInfoPort();
+    if ("https".equals(scheme)) {
+      final Integer portObject = (Integer) getServletContext().getAttribute(
+          DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
+      if (portObject != null) {
+        port = portObject;
+      }
+    }
 
     String dtParam = "";
     if (dt != null) {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Thu Oct 17 02:14:33 2013
@@ -52,6 +52,7 @@ public class NameNodeHttpServer {
   private final NameNode nn;
   
   private InetSocketAddress httpAddress;
+  private InetSocketAddress httpsAddress;
   private InetSocketAddress bindAddress;
   
   public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
@@ -99,14 +100,15 @@ public class NameNodeHttpServer {
     boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false);
     if (certSSL) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
-      InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(infoHost + ":" + conf.get(
-        DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, infoHost + ":" + 0));
+      httpsAddress = NetUtils.createSocketAddr(conf.get(
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
+
       Configuration sslConf = new Configuration(false);
-      if (certSSL) {
-        sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
-                                     "ssl-server.xml"));
-      }
-      httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
+      sslConf.addResource(conf.get(
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+      httpServer.addSslListener(httpsAddress, sslConf, needClientAuth);
       // assume same ssl port for all datanodes
       InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
         DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 50475));
@@ -163,6 +165,10 @@ public class NameNodeHttpServer {
     return httpAddress;
   }
 
+  public InetSocketAddress getHttpsAddress() {
+    return httpsAddress;
+  }
+
   /**
    * Sets fsimage for use by servlets.
    * 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Oct 17 02:14:33 2013
@@ -29,8 +29,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -553,11 +554,11 @@ class NameNodeRpcServer implements Namen
       stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
           + " fileId=" + fileId + " for " + clientName);
     }
-    HashMap<Node, Node> excludedNodesSet = null;
+    Set<Node> excludedNodesSet = null;
     if (excludedNodes != null) {
-      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+      excludedNodesSet = new HashSet<Node>(excludedNodes.length);
       for (Node node : excludedNodes) {
-        excludedNodesSet.put(node, node);
+        excludedNodesSet.add(node);
       }
     }
     List<String> favoredNodesList = (favoredNodes == null) ? null
@@ -585,11 +586,11 @@ class NameNodeRpcServer implements Namen
 
     metrics.incrGetAdditionalDatanodeOps();
 
-    HashMap<Node, Node> excludeSet = null;
+    Set<Node> excludeSet = null;
     if (excludes != null) {
-      excludeSet = new HashMap<Node, Node>(excludes.length);
+      excludeSet = new HashSet<Node>(excludes.length);
       for (Node node : excludes) {
-        excludeSet.put(node, node);
+        excludeSet.add(node);
       }
     }
     return namesystem.getAdditionalDatanode(src, blk,

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Thu Oct 17 02:14:33 2013
@@ -30,6 +30,8 @@ import java.net.URLEncoder;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -101,6 +103,10 @@ class NamenodeJspHelper {
   }
 
   static String getRollingUpgradeText(FSNamesystem fsn) {
+    if (fsn == null) {
+      return "";
+    }
+
     DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
     Map<String, Integer> list = dm.getDatanodesSoftwareVersions();
     if(list.size() > 1) {
@@ -203,6 +209,20 @@ class NamenodeJspHelper {
     return "";
   }
 
+  static void generateSnapshotReport(JspWriter out, FSNamesystem fsn)
+      throws IOException {
+    if (fsn == null) {
+      return;
+    }
+    out.println("<div id=\"snapshotstats\"><div class=\"dfstable\">"
+        + "<table class=\"storage\" title=\"Snapshot Summary\">\n"
+        + "<thead><tr><td><b>Snapshottable directories</b></td>"
+        + "<td><b>Snapshotted directories</b></td></tr></thead>");
+
+    out.println(String.format("<td>%d</td><td>%d</td>", fsn.getNumSnapshottableDirs(), fsn.getNumSnapshots()));
+    out.println("</table></div></div>");
+  }
+
   static class HealthJsp {
     private int rowNum = 0;
     private int colNum = 0;
@@ -636,25 +656,22 @@ class NamenodeJspHelper {
         .getAttribute(JspHelper.CURRENT_CONF);
     // We can't redirect if there isn't a DN to redirect to.
     // Lets instead show a proper error message.
-    if (nn.getNamesystem().getNumLiveDataNodes() < 1) {
+    FSNamesystem fsn = nn.getNamesystem();
+
+    DatanodeID datanode = null;
+    if (fsn != null && fsn.getNumLiveDataNodes() >= 1) {
+      datanode = getRandomDatanode(nn);
+    }
+
+    if (datanode == null) {
       throw new IOException("Can't browse the DFS since there are no " +
           "live nodes available to redirect to.");
     }
-    final DatanodeID datanode = getRandomDatanode(nn);;
+
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
+    // if the user is defined, get a delegation token and stringify it
     String tokenString = getDelegationToken(
         nn.getRpcServer(), request, conf, ugi);
-    // if the user is defined, get a delegation token and stringify it
-    final String redirectLocation;
-    final String nodeToRedirect;
-    int redirectPort;
-    if (datanode != null) {
-      nodeToRedirect = datanode.getIpAddr();
-      redirectPort = datanode.getInfoPort();
-    } else {
-      nodeToRedirect = nn.getHttpAddress().getHostName();
-      redirectPort = nn.getHttpAddress().getPort();
-    }
 
     InetSocketAddress rpcAddr = nn.getNameNodeAddress();
     String rpcHost = rpcAddr.getAddress().isAnyLocalAddress()
@@ -662,16 +679,31 @@ class NamenodeJspHelper {
       : rpcAddr.getAddress().getHostAddress();
     String addr = rpcHost + ":" + rpcAddr.getPort();
 
-    String fqdn = InetAddress.getByName(nodeToRedirect).getCanonicalHostName();
-    redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":" + redirectPort
+    final String redirectLocation =
+        JspHelper.Url.url(request.getScheme(), datanode)
         + "/browseDirectory.jsp?namenodeInfoPort="
-        + nn.getHttpAddress().getPort() + "&dir=/"
+        + request.getServerPort() + "&dir=/"
         + (tokenString == null ? "" :
            JspHelper.getDelegationTokenUrlParam(tokenString))
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
+
     resp.sendRedirect(redirectLocation);
   }
 
+  /**
+   * Returns a descriptive label for the running NameNode.  If the NameNode has
+   * initialized to the point of running its RPC server, then this label consists
+   * of the host and port of the RPC server.  Otherwise, the label is a message
+   * stating that the NameNode is still initializing.
+   * 
+   * @param nn NameNode to describe
+   * @return String NameNode label
+   */
+  static String getNameNodeLabel(NameNode nn) {
+    return nn.getRpcServer() != null ? nn.getNameNodeAddressHostPortString() :
+      "initializing";
+  }
+
   static class NodeListJsp {
     private int rowNum = 0;
 
@@ -709,12 +741,11 @@ class NamenodeJspHelper {
     }
 
     private void generateNodeDataHeader(JspWriter out, DatanodeDescriptor d,
-        String suffix, boolean alive, int nnHttpPort, String nnaddr)
+        String suffix, boolean alive, int nnInfoPort, String nnaddr, String scheme)
         throws IOException {
       // from nn_browsedfscontent.jsp:
-      String url = HttpConfig.getSchemePrefix() + d.getHostName() + ":"
-          + d.getInfoPort()
-          + "/browseDirectory.jsp?namenodeInfoPort=" + nnHttpPort + "&dir="
+      String url = "///" + JspHelper.Url.authority(scheme, d)
+          + "/browseDirectory.jsp?namenodeInfoPort=" + nnInfoPort + "&dir="
           + URLEncoder.encode("/", "UTF-8")
           + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnaddr);
 
@@ -731,9 +762,9 @@ class NamenodeJspHelper {
     }
 
     void generateDecommissioningNodeData(JspWriter out, DatanodeDescriptor d,
-        String suffix, boolean alive, int nnHttpPort, String nnaddr)
+        String suffix, boolean alive, int nnInfoPort, String nnaddr, String scheme)
         throws IOException {
-      generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
+      generateNodeDataHeader(out, d, suffix, alive, nnInfoPort, nnaddr, scheme);
       if (!alive) {
         return;
       }
@@ -757,7 +788,7 @@ class NamenodeJspHelper {
     }
     
     void generateNodeData(JspWriter out, DatanodeDescriptor d, String suffix,
-        boolean alive, int nnHttpPort, String nnaddr) throws IOException {
+        boolean alive, int nnInfoPort, String nnaddr, String scheme) throws IOException {
       /*
        * Say the datanode is dn1.hadoop.apache.org with ip 192.168.0.5 we use:
        * 1) d.getHostName():d.getPort() to display. Domain and port are stripped
@@ -769,10 +800,14 @@ class NamenodeJspHelper {
        * interact with datanodes.
        */
 
-      generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
+      generateNodeDataHeader(out, d, suffix, alive, nnInfoPort, nnaddr, scheme);
+      long currentTime = Time.now();
+      long timestamp = d.getLastUpdate();
       if (!alive) {
-        out.print("<td class=\"decommissioned\"> " + 
-            d.isDecommissioned() + "\n");
+        out.print("<td class=\"lastcontact\"> "
+            + new Date(timestamp) 
+            + "<td class=\"decommissioned\"> "
+            + d.isDecommissioned() + "\n");
         return;
       }
 
@@ -785,9 +820,6 @@ class NamenodeJspHelper {
       String percentRemaining = fraction2String(d.getRemainingPercent());
 
       String adminState = d.getAdminState().toString();
-
-      long timestamp = d.getLastUpdate();
-      long currentTime = Time.now();
       
       long bpUsed = d.getBlockPoolUsed();
       String percentBpUsed = fraction2String(d.getBlockPoolUsedPercent());
@@ -826,17 +858,17 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
       final FSNamesystem ns = nn.getNamesystem();
+      if (ns == null) {
+        return;
+      }
       final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
 
       final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
       final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       dm.fetchDatanodes(live, dead, true);
 
-      InetSocketAddress nnSocketAddress =
-          (InetSocketAddress)context.getAttribute(
-              NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
-      String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
-          + nnSocketAddress.getPort();
+      String nnaddr = nn.getServiceRpcAddress().getAddress().getHostName() + ":"
+          + nn.getServiceRpcAddress().getPort();
 
       whatNodes = request.getParameter("whatNodes"); // show only live or only
                                                      // dead nodes
@@ -872,16 +904,11 @@ class NamenodeJspHelper {
 
       counterReset();
 
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-
       if (live.isEmpty() && dead.isEmpty()) {
         out.print("There are no datanodes in the cluster");
       } else {
 
-        int nnHttpPort = nn.getHttpAddress().getPort();
+        int nnInfoPort = request.getServerPort();
         out.print("<div id=\"dfsnodetable\"> ");
         if (whatNodes.equals("LIVE")) {
           out.print("<a name=\"LiveNodes\" id=\"title\">" + "Live Datanodes : "
@@ -923,8 +950,8 @@ class NamenodeJspHelper {
 
             JspHelper.sortNodeList(live, sorterField, sorterOrder);
             for (int i = 0; i < live.size(); i++) {
-              generateNodeData(out, live.get(i), port_suffix, true, nnHttpPort,
-                  nnaddr);
+              generateNodeData(out, live.get(i), port_suffix, true, nnInfoPort,
+                  nnaddr, request.getScheme());
             }
           }
           out.print("</table>\n");
@@ -938,13 +965,15 @@ class NamenodeJspHelper {
                 + "<th " + nodeHeaderStr("node")
                 + "> Node <th " + nodeHeaderStr("address")
                 + "> Transferring<br>Address <th "
+                + nodeHeaderStr("lastcontact")
+                + "> Last <br>Contact <th "
                 + nodeHeaderStr("decommissioned")
                 + "> Decommissioned\n");
 
             JspHelper.sortNodeList(dead, sorterField, sorterOrder);
             for (int i = 0; i < dead.size(); i++) {
               generateNodeData(out, dead.get(i), port_suffix, false,
-                  nnHttpPort, nnaddr);
+                  nnInfoPort, nnaddr, request.getScheme());
             }
 
             out.print("</table>\n");
@@ -975,7 +1004,7 @@ class NamenodeJspHelper {
             JspHelper.sortNodeList(decommissioning, "name", "ASC");
             for (int i = 0; i < decommissioning.size(); i++) {
               generateDecommissioningNodeData(out, decommissioning.get(i),
-                  port_suffix, true, nnHttpPort, nnaddr);
+                  port_suffix, true, nnInfoPort, nnaddr, request.getScheme());
             }
             out.print("</table>\n");
           }
@@ -1003,14 +1032,16 @@ class NamenodeJspHelper {
     final BlockManager blockManager;
     
     XMLBlockInfo(FSNamesystem fsn, Long blockId) {
-      this.blockManager = fsn.getBlockManager();
+      this.blockManager = fsn != null ? fsn.getBlockManager() : null;
 
       if (blockId == null) {
         this.block = null;
         this.inode = null;
       } else {
         this.block = new Block(blockId);
-        this.inode = ((INode)blockManager.getBlockCollection(block)).asFile();
+        this.inode = blockManager != null ?
+          ((INode)blockManager.getBlockCollection(block)).asFile() :
+          null;
       }
     }
 
@@ -1084,8 +1115,10 @@ class NamenodeJspHelper {
         } 
 
         doc.startTag("replicas");
-        for(final Iterator<DatanodeDescriptor> it = blockManager.datanodeIterator(block);
-            it.hasNext(); ) {
+        for (final Iterator<DatanodeDescriptor> it = blockManager != null ?
+            blockManager.datanodeIterator(block) :
+            Collections.<DatanodeDescriptor>emptyList().iterator();
+            it.hasNext();) {
           doc.startTag("replica");
 
           DatanodeDescriptor dd = it.next();
@@ -1121,7 +1154,7 @@ class NamenodeJspHelper {
     
     XMLCorruptBlockInfo(FSNamesystem fsn, Configuration conf,
                                int numCorruptBlocks, Long startingBlockId) {
-      this.blockManager = fsn.getBlockManager();
+      this.blockManager = fsn != null ? fsn.getBlockManager() : null;
       this.conf = conf;
       this.numCorruptBlocks = numCorruptBlocks;
       this.startingBlockId = startingBlockId;
@@ -1144,16 +1177,19 @@ class NamenodeJspHelper {
       doc.endTag();
       
       doc.startTag("num_missing_blocks");
-      doc.pcdata(""+blockManager.getMissingBlocksCount());
+      doc.pcdata("" + (blockManager != null ?
+        blockManager.getMissingBlocksCount() : 0));
       doc.endTag();
       
       doc.startTag("num_corrupt_replica_blocks");
-      doc.pcdata(""+blockManager.getCorruptReplicaBlocksCount());
+      doc.pcdata("" + (blockManager != null ?
+        blockManager.getCorruptReplicaBlocksCount() : 0));
       doc.endTag();
      
       doc.startTag("corrupt_replica_block_ids");
-      final long[] corruptBlockIds = blockManager.getCorruptReplicaBlockIds(
-          numCorruptBlocks, startingBlockId);
+      final long[] corruptBlockIds = blockManager != null ?
+        blockManager.getCorruptReplicaBlockIds(numCorruptBlocks,
+        startingBlockId) : null;
       if (corruptBlockIds != null) {
         for (Long blockId: corruptBlockIds) {
           doc.startTag("block_id");

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java?rev=1532952&r1=1532951&r2=1532952&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java Thu Oct 17 02:14:33 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ipc.StandbyException;
@@ -43,4 +44,6 @@ public interface Namesystem extends RwLo
   public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
 
   public void checkOperation(OperationCategory read) throws StandbyException;
+
+  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
 }
\ No newline at end of file