You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [21/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Jun 21 06:37:27 2013
@@ -19,25 +19,33 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
-/** Perform permission checking in {@link FSNamesystem}. */
+/** 
+ * Class that helps in checking file system permission.
+ * The state of this class need not be synchronized as it has data structures that
+ * are read-only.
+ * 
+ * Some of the helper methods are guarded by {@link FSNamesystem} intrinsic lock.
+ */
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
-private final UserGroupInformation ugi;
-  public final String user;
-  private final Set<String> groups = new HashSet<String>();
-  public final boolean isSuper;
+  private final UserGroupInformation ugi;
+  private final String user;
+  /** A set with group namess. Not synchronized since it is unmodifiable */
+  private final Set<String> groups;
+  private final boolean isSuper;
   
   FSPermissionChecker(String fsOwner, String supergroup
       ) throws AccessControlException{
@@ -47,9 +55,9 @@ private final UserGroupInformation ugi;
       throw new AccessControlException(e); 
     } 
 
-    groups.addAll(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    groups = Collections.unmodifiableSet(s);
     user = ugi.getShortUserName();
-
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
 
@@ -59,20 +67,23 @@ private final UserGroupInformation ugi;
    */
   public boolean containsGroup(String group) {return groups.contains(group);}
 
+  public String getUser() {
+    return user;
+  }
+
+  public boolean isSuperUser() {
+    return isSuper;
+  }
+
   /**
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
-   * @param owner owner of the system
-   * @param supergroup supergroup of the system
    */
-  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
-                                             String supergroup) 
-                     throws AccessControlException {
-    FSPermissionChecker checker = 
-      new FSPermissionChecker(owner.getShortUserName(), supergroup);
-    if (!checker.isSuper) {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
+    if (!isSuper) {
       throw new AccessControlException("Access denied for user " 
-          + checker.user + ". Superuser privilege is required");
+          + user + ". Superuser privilege is required");
     }
   }
 
@@ -102,8 +113,10 @@ private final UserGroupInformation ugi;
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
+   * 
+   * Guarded by {@link FSNamesystem} intrinsic lock
+   * Caller of this method must hold that lock.
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@@ -142,6 +155,7 @@ private final UserGroupInformation ugi;
     }
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void checkOwner(INode inode) throws AccessControlException {
     if (inode != null && user.equals(inode.getUserName())) {
       return;
@@ -149,6 +163,7 @@ private final UserGroupInformation ugi;
     throw new AccessControlException("Permission denied");
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void checkTraverse(INode[] inodes, int last
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
@@ -156,6 +171,7 @@ private final UserGroupInformation ugi;
     }
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void checkSubAccess(INode inode, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
@@ -175,11 +191,13 @@ private final UserGroupInformation ugi;
     }
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void check(INode[] inodes, int i, FsAction access
       ) throws AccessControlException {
     check(i >= 0? inodes[i]: null, access);
   }
 
+  /** Guarded by {@link FSNamesystem} intrinsic lock */
   private void check(INode inode, FsAction access
       ) throws AccessControlException {
     if (inode == null) {

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Fri Jun 21 06:37:27 2013
@@ -57,7 +57,7 @@ public class FileChecksumServlets {
       final UserGroupInformation ugi = getUGI(request, conf);
       String tokenString = request.getParameter(JspHelper.DELEGATION_PARAMETER_NAME);
       final NameNode namenode = (NameNode)context.getAttribute("name.node");
-      final DatanodeID datanode = namenode.namesystem.getRandomDatanode();
+      final DatanodeID datanode = namenode.getNamesystem().getRandomDatanode();
       try {
         final URI uri = 
           createRedirectUri("/getFileChecksum", ugi, datanode, request, tokenString);

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Fri Jun 21 06:37:27 2013
@@ -55,8 +55,8 @@ public class FsckServlet extends DfsServ
         @Override
         public Object run() throws Exception {
           final NameNode nn = (NameNode) context.getAttribute("name.node");
-          final int totalDatanodes = nn.namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE); 
-          final short minReplication = nn.namesystem.getMinReplication();
+          final int totalDatanodes = nn.getNamesystem().getNumberOfDatanodes(DatanodeReportType.LIVE); 
+          final short minReplication = nn.getNamesystem().getMinReplication();
 
           new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
               totalDatanodes, minReplication, remoteAddress).fsck();

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Fri Jun 21 06:37:27 2013
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
@@ -66,12 +67,13 @@ public class GetImageServlet extends Htt
       final FSImage nnImage = (FSImage)context.getAttribute("name.system.image");
       final TransferFsImage ff = new TransferFsImage(pmap, request, response);
       final Configuration conf = (Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);
+
       if(UserGroupInformation.isSecurityEnabled() && 
-          !isValidRequestor(request.getRemoteUser(), conf)) {
+          !isValidRequestor(request.getUserPrincipal().getName(), conf)) {
         response.sendError(HttpServletResponse.SC_FORBIDDEN, 
             "Only Namenode and Secondary Namenode may access this servlet");
         LOG.warn("Received non-NN/SNN request for image or edits from " 
-            + request.getRemoteHost());
+            + request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
         return;
       }
       
@@ -82,11 +84,11 @@ public class GetImageServlet extends Htt
           if (ff.getImage()) {
             // send fsImage
             TransferFsImage.getFileServer(response.getOutputStream(),
-                                          nnImage.getFsImageName()); 
+                nnImage.getFsImageName(), getThrottler(conf)); 
           } else if (ff.getEdit()) {
             // send edits
             TransferFsImage.getFileServer(response.getOutputStream(),
-                                          nnImage.getFsEditName());
+                nnImage.getFsEditName(), getThrottler(conf));
           } else if (ff.putImage()) {
             synchronized (fsImageTransferLock) {
               final MD5Hash expectedChecksum = ff.getNewChecksum();
@@ -120,20 +122,36 @@ public class GetImageServlet extends Htt
           return UserGroupInformation
           .loginUserFromKeytabAndReturnUGI(
                   SecurityUtil.getServerPrincipal(conf
-                      .get(DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), NameNode
+                      .get(DFS_NAMENODE_USER_NAME_KEY), NameNode
                       .getAddress(conf).getHostName()),
               conf.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
         }
       });
 
-    } catch (Exception ie) {
-      String errMsg = "GetImage failed. " + StringUtils.stringifyException(ie);
+    } catch (Throwable t) {
+      String errMsg = "GetImage failed. " + StringUtils.stringifyException(t);
       response.sendError(HttpServletResponse.SC_GONE, errMsg);
       throw new IOException(errMsg);
     } finally {
       response.getOutputStream().close();
     }
   }
+
+  /**
+   * Construct a throttler from conf
+   * @param conf configuration
+   * @return a data transfer throttler
+   */
+  private final DataTransferThrottler getThrottler(Configuration conf) {
+    long transferBandwidth = 
+      conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
+                   DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
+    DataTransferThrottler throttler = null;
+    if (transferBandwidth > 0) {
+      throttler = new DataTransferThrottler(transferBandwidth);
+    }
+    return throttler;
+  }
   
   private boolean isValidRequestor(String remoteUser, Configuration conf)
       throws IOException {
@@ -143,25 +161,19 @@ public class GetImageServlet extends Htt
     }
     
     String[] validRequestors = {
-        SecurityUtil.getServerPrincipal(conf
-            .get(DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), NameNode.getAddress(
-            conf).getHostName()),
         SecurityUtil.getServerPrincipal(conf.get(DFS_NAMENODE_USER_NAME_KEY),
             NameNode.getAddress(conf).getHostName()),
         SecurityUtil.getServerPrincipal(conf
-            .get(DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY),
-            SecondaryNameNode.getHttpAddress(conf).getHostName()),
-        SecurityUtil.getServerPrincipal(conf
             .get(DFS_SECONDARY_NAMENODE_USER_NAME_KEY), SecondaryNameNode
             .getHttpAddress(conf).getHostName()) };
     
     for(String v : validRequestors) {
       if(v != null && v.equals(remoteUser)) {
-        if(LOG.isDebugEnabled()) LOG.debug("isValidRequestor is allowing: " + remoteUser);
+        LOG.info("GetImageServlet allowing: " + remoteUser);
         return true;
       }
     }
-    if(LOG.isDebugEnabled()) LOG.debug("isValidRequestor is rejecting: " + remoteUser);
+    LOG.info("GetImageServlet rejecting: " + remoteUser);
     return false;
   }
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java Fri Jun 21 06:37:27 2013
@@ -185,4 +185,14 @@ class Host2NodesMap {
       hostmapLock.readLock().unlock();
     }
   }
+  
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+        .append("[");
+    for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
+      b.append("\n  " + e.getKey() + " => " + Arrays.asList(e.getValue()));
+    }
+    return b.append("\n]").toString();
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Fri Jun 21 06:37:27 2013
@@ -88,7 +88,7 @@ class INodeDirectoryWithQuota extends IN
    * @param dsQuota diskspace quota to be set
    *                                
    */
-  void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+  void setQuota(long newNsQuota, long newDsQuota) {
     nsQuota = newNsQuota;
     dsQuota = newDsQuota;
   }
@@ -122,6 +122,15 @@ class INodeDirectoryWithQuota extends IN
     diskspace += dsDelta;
   }
   
+  /** Update the size of the tree
+   * 
+   * @param nsDelta the change of the tree size
+   * @param dsDelta change to disk space occupied
+   */
+  void addSpaceConsumed(long nsDelta, long dsDelta) {
+    setSpaceConsumed(nsCount + nsDelta, diskspace + dsDelta);
+  }
+  
   /** 
    * Sets namespace and diskspace take by the directory rooted 
    * at this INode. This should be used carefully. It does not check 

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Jun 21 06:37:27 2013
@@ -112,6 +112,26 @@ class INodeFile extends INode {
   }
 
   /**
+   * append array of blocks to this.blocks
+   */
+  void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+    int size = this.blocks.length;
+    
+    BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
+    System.arraycopy(this.blocks, 0, newlist, 0, size);
+    
+    for(INodeFile in: inodes) {
+      System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
+      size += in.blocks.length;
+    }
+    
+    for(BlockInfo bi: this.blocks) {
+      bi.setINode(this);
+    }
+    this.blocks = newlist;
+  }
+  
+  /**
    * add a block to the block list
    */
   void addBlock(BlockInfo newblock) {
@@ -136,8 +156,11 @@ class INodeFile extends INode {
 
   int collectSubtreeBlocksAndClear(List<Block> v) {
     parent = null;
-    for (Block blk : blocks) {
-      v.add(blk);
+    if (blocks != null && v != null) {
+      for (BlockInfo blk : blocks) {
+        v.add(blk);
+        blk.setINode(null);
+      }
     }
     blocks = null;
     return 1;
@@ -170,6 +193,9 @@ class INodeFile extends INode {
   
   long diskspaceConsumed(Block[] blkArr) {
     long size = 0;
+    if(blkArr == null) 
+      return 0;
+    
     for (Block blk : blkArr) {
       if (blk != null) {
         size += blk.getNumBytes();

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java Fri Jun 21 06:37:27 2013
@@ -65,6 +65,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
 
 public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
@@ -506,7 +507,6 @@ public class JspHelper {
           ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
         }
         ugi.addToken(token);
-        ugi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
       } else {
         if(remoteUser == null) {
           throw new IOException("Security enabled but user not " +
@@ -626,4 +626,23 @@ public class JspHelper {
   public static int getDefaultChunkSize(Configuration conf) {
     return conf.getInt("dfs.default.chunk.view.size", 32 * 1024);
   }
+  
+  /** Return a table containing version information. */
+  static String getVersionTable(FSNamesystem fsn) {
+    return "<div class='dfstable'><table>"
+        + "\n  <tr><td class='col1'>Started:</td><td>" + fsn.getStartTime()
+        + "</td></tr>\n" + "\n  <tr><td class='col1'>Version:</td><td>"
+        + VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
+        + "</td></tr>\n" + "\n  <tr><td class='col1'>Compiled:</td><td>"
+        + VersionInfo.getDate() + " by " + VersionInfo.getUser()
+        + "</td></tr>\n</table></div>";
+  }
+
+  /** Return a table containing version information. */
+  public static String getVersionTable() {
+    return "<div id='dfstable'><table>"       
+        + "\n  <tr><td id='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
+        + "\n  <tr><td id='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser()
+        + "\n</table></div>";
+  }
 }

Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/** Context data for an ongoing NameNode recovery process. */
+public final class MetaRecoveryContext  {
+  public static final Log LOG = LogFactory.getLog(MetaRecoveryContext.class.getName());
+  private int force;
+  public static final int FORCE_NONE = 0;
+  public static final int FORCE_FIRST_CHOICE = 1;
+  public static final int FORCE_ALL = 2;
+
+  public MetaRecoveryContext(int force) {
+    this.force = force;
+  }
+  /** Display a prompt to the user and get his or her choice.
+   *  
+   * @param prompt      The prompt to display
+   * @param c1          Choice 1
+   * @param choices     Other choies
+   *
+   * @return            The choice that was taken
+   * @throws IOException
+   */
+  public String ask(String prompt, String firstChoice, String... choices) 
+      throws IOException {
+    while (true) {
+      LOG.error(prompt);
+      if (force > FORCE_NONE) {
+        LOG.info("Automatically choosing " + firstChoice);
+        return firstChoice;
+      }
+      StringBuilder responseBuilder = new StringBuilder();
+      while (true) {
+        int c = System.in.read();
+        if (c == -1 || c == '\r' || c == '\n') {
+          break;
+        }
+        responseBuilder.append((char)c);
+      }
+      String response = responseBuilder.toString();
+      if (response.equalsIgnoreCase(firstChoice)) {
+        return firstChoice;
+      }
+      for (String c : choices) {
+        if (response.equalsIgnoreCase(c)) {
+          return c;
+        }
+      }
+      LOG.error("I'm sorry, I cannot understand your response.\n");
+    }
+  }
+  /** Log a message and quit */
+  public void quit() {
+    LOG.error("Exiting on user request.");
+    System.exit(0);
+  }
+
+  static public void editLogLoaderPrompt(String prompt,
+      MetaRecoveryContext recovery) throws IOException
+  {
+    if (recovery == null) {
+      throw new IOException(prompt);
+    }
+    LOG.error(prompt);
+    String answer = recovery.ask(
+      "\nEnter 's' to stop reading the edit log here, abandoning any later " +
+        "edits.\n" +
+      "Enter 'q' to quit without saving.\n" +
+      "(s/q)", "s", "q");
+    if (answer.equals("s")) {
+      LOG.error("We will stop reading the edits log here.  "
+          + "NOTE: Some edits have been lost!");
+      return;
+    } else if (answer.equals("q")) {
+      recovery.quit();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameCache.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameCache.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameCache.java Fri Jun 21 06:37:27 2013
@@ -152,4 +152,14 @@ class NameCache<K> {
     cache.put(name, name);
     lookups += useThreshold;
   }
+  
+  public void reset() {
+    initialized = false;
+    cache.clear();
+    if (transientMap == null) {
+      transientMap = new HashMap<K, UseCount>();
+    } else {
+      transientMap.clear();
+    }
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jun 21 06:37:27 2013
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -82,13 +85,17 @@ import org.apache.hadoop.security.Groups
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**********************************************************
  * NameNode serves as both directory namespace manager and
@@ -154,7 +161,7 @@ public class NameNode implements ClientP
 
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
-  public FSNamesystem namesystem; // TODO: This should private. Use getNamesystem() instead. 
+  private FSNamesystem namesystem;
   /** RPC server */
   private Server server;
   /** RPC server for HDFS Services communication.
@@ -177,15 +184,22 @@ public class NameNode implements ClientP
   private boolean stopRequested = false;
   /** Is service level authorization enabled? */
   private boolean serviceAuthEnabled = false;
+  /** Activated plug-ins. */
+  private List<ServicePlugin> plugins;
   
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
   public static void format(Configuration conf) throws IOException {
-    format(conf, false);
+    format(conf, false, true);
   }
 
   static NameNodeInstrumentation myMetrics;
 
+  /* Should only be used for test */
+  public void setNamesystem(FSNamesystem ns) {
+    namesystem = ns;
+  }
+  
   public FSNamesystem getNamesystem() {
     return namesystem;
   }
@@ -273,13 +287,25 @@ public class NameNode implements ClientP
     if (serviceAuthEnabled = 
           conf.getBoolean(
             ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
+      PolicyProvider policyProvider = 
+          (PolicyProvider)(ReflectionUtils.newInstance(
+              conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                  HDFSPolicyProvider.class, PolicyProvider.class), 
+              conf));
+        ServiceAuthorizationManager.refresh(conf, policyProvider);
     }
     
     myMetrics = NameNodeInstrumentation.create(conf);
     this.namesystem = new FSNamesystem(this, conf);
+    
+    // For testing purposes, allow the DT secret manager to be started regardless
+    // of whether security is enabled.
+    boolean alwaysUseDelegationTokensForTests = 
+      conf.getBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
 
-    if (UserGroupInformation.isSecurityEnabled()) {
+    if (UserGroupInformation.isSecurityEnabled() ||
+        alwaysUseDelegationTokensForTests) {
       namesystem.activateSecretManager();
     }
 
@@ -298,7 +324,9 @@ public class NameNode implements ClientP
     this.server = RPC.getServer(this, socAddr.getHostName(),
         socAddr.getPort(), handlerCount, false, conf, namesystem
         .getDelegationTokenSecretManager());
-
+    // Set terse exception whose stack trace won't be logged
+    this.server.addTerseExceptions(SafeModeException.class);
+    
     // The rpc-server port can be ephemeral... ensure we have the correct info
     this.serverAddress = this.server.getListenerAddress(); 
     FileSystem.setDefaultUri(conf, getUri(serverAddress));
@@ -312,6 +340,15 @@ public class NameNode implements ClientP
       serviceRpcServer.start();      
     }
     startTrashEmptier(conf);
+    
+    plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
+    for (ServicePlugin p: plugins) {
+      try {
+        p.start(this);
+      } catch (Throwable t) {
+        LOG.warn("ServicePlugin " + p + " could not be started", t);
+      }
+    }
   }
 
   private void startTrashEmptier(Configuration conf) throws IOException {
@@ -322,36 +359,37 @@ public class NameNode implements ClientP
 
   @SuppressWarnings("deprecation")
   public static String getInfoServer(Configuration conf) {
-    String http = UserGroupInformation.isSecurityEnabled() ? "dfs.https.address"
-        : "dfs.http.address";
+    String http = SecurityUtil.useKsslAuth() ? "dfs.https.address" :
+        "dfs.http.address";
     return NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
         "dfs.info.port", http);
   }
+
+  /**
+   * @return "https" if KSSL should be used, "http" if security is disabled
+   *         or SPNEGO is enabled.
+   */
+  public static String getHttpUriScheme() {
+    return SecurityUtil.useKsslAuth() ? "https" : "http";
+  }
   
   @SuppressWarnings("deprecation")
   private void startHttpServer(final Configuration conf) throws IOException {
     final String infoAddr = NetUtils.getServerAddress(conf,
         "dfs.info.bindAddress", "dfs.info.port", "dfs.http.address");
     final InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
-    if(UserGroupInformation.isSecurityEnabled()) {
+    
+    if (SecurityUtil.useKsslAuth()) {
       String httpsUser = SecurityUtil.getServerPrincipal(conf
           .get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY), infoSocAddr
           .getHostName());
-      if (httpsUser == null) {
-        LOG.warn(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY
-            + " not defined in config. Starting http server as "
-            + SecurityUtil.getServerPrincipal(conf
-                .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), serverAddress
-                .getHostName())
-            + ": Kerberized SSL may be not function correctly.");
-      } else {
-        // Kerberized SSL servers must be run from the host principal...
-        LOG.info("Logging in as " + httpsUser + " to start http server.");
-        SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
-            DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY, infoSocAddr
-                .getHostName());
-      }
+      // Kerberized SSL servers must be run from the host principal...
+      LOG.info("Logging in as " + httpsUser + " to start http server.");
+      SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
+          DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY, infoSocAddr
+              .getHostName());
     }
+
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
     try {
       this.httpServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
@@ -363,6 +401,33 @@ public class NameNode implements ClientP
               infoPort == 0, conf, 
               SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN)) {
             {
+              // Add SPNEGO support to NameNode
+              if (UserGroupInformation.isSecurityEnabled() &&
+                  !SecurityUtil.useKsslAuth()) {
+                Map<String, String> params = new HashMap<String, String>();
+                String principalInConf = conf.get(
+                    DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY);
+                if (principalInConf != null && !principalInConf.isEmpty()) {
+                  params.put("kerberos.principal",
+                      SecurityUtil.getServerPrincipal(principalInConf,
+                          serverAddress.getHostName()));
+                }
+                String httpKeytab = conf.get(
+                  DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
+                if (httpKeytab == null) {
+                  httpKeytab = 
+                    conf.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
+                }
+                if (httpKeytab != null && !httpKeytab.isEmpty()) {
+                  params.put("kerberos.keytab", httpKeytab);
+                }
+  
+                params.put(AuthenticationFilter.AUTH_TYPE, "kerberos");
+  
+                defineFilter(webAppContext, SPNEGO_FILTER,
+                    AuthenticationFilter.class.getName(), params, null);
+              }
+  
               if (WebHdfsFileSystem.isEnabled(conf, LOG)) {
                 //add SPNEGO authentication filter for webhdfs
                 final String name = "SPNEGO";
@@ -404,8 +469,7 @@ public class NameNode implements ClientP
           };
           
           boolean certSSL = conf.getBoolean("dfs.https.enable", false);
-          boolean useKrb = UserGroupInformation.isSecurityEnabled();
-          if (certSSL || useKrb) {
+          if (certSSL || SecurityUtil.useKsslAuth()) {
             boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
             InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(infoHost + ":"+ conf.get(
                                 "dfs.https.port", infoHost + ":" + 0));
@@ -414,7 +478,8 @@ public class NameNode implements ClientP
               sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
                   "ssl-server.xml"));
             }
-            httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth, useKrb);
+            httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth,
+                SecurityUtil.useKsslAuth());
             // assume same ssl port for all datanodes
             InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
                 "dfs.datanode.https.address", infoHost + ":" + 50475));
@@ -425,29 +490,32 @@ public class NameNode implements ClientP
           httpServer.setAttribute("name.node.address", getNameNodeAddress());
           httpServer.setAttribute("name.system.image", getFSImage());
           httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
-          httpServer.addInternalServlet("getDelegationToken", 
-                                        GetDelegationTokenServlet.PATH_SPEC, 
-                                        GetDelegationTokenServlet.class, true);
-          httpServer.addInternalServlet("renewDelegationToken", 
-                                        RenewDelegationTokenServlet.PATH_SPEC, 
-                                        RenewDelegationTokenServlet.class, true);
-          httpServer.addInternalServlet("cancelDelegationToken", 
-                                        CancelDelegationTokenServlet.PATH_SPEC, 
+          httpServer.addInternalServlet("getDelegationToken",
+                                        GetDelegationTokenServlet.PATH_SPEC,
+                                        GetDelegationTokenServlet.class, true,
+                                        SecurityUtil.useKsslAuth());
+          httpServer.addInternalServlet("renewDelegationToken",
+                                        RenewDelegationTokenServlet.PATH_SPEC,
+                                        RenewDelegationTokenServlet.class, true,
+                                        SecurityUtil.useKsslAuth());
+          httpServer.addInternalServlet("cancelDelegationToken",
+                                        CancelDelegationTokenServlet.PATH_SPEC,
                                         CancelDelegationTokenServlet.class,
-                                        true);
-          httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true);
-          httpServer.addInternalServlet("getimage", "/getimage", 
-              GetImageServlet.class, true);
-          httpServer.addInternalServlet("listPaths", "/listPaths/*", 
-              ListPathsServlet.class, false);
-          httpServer.addInternalServlet("data", "/data/*", 
-              FileDataServlet.class, false);
+                                        true, SecurityUtil.useKsslAuth());
+          httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true,
+              SecurityUtil.useKsslAuth());
+          httpServer.addInternalServlet("getimage", "/getimage",
+              GetImageServlet.class, true, SecurityUtil.useKsslAuth());
+          httpServer.addInternalServlet("listPaths", "/listPaths/*",
+              ListPathsServlet.class);
+          httpServer.addInternalServlet("data", "/data/*",
+              FileDataServlet.class);
           httpServer.addInternalServlet("checksum", "/fileChecksum/*",
-              FileChecksumServlets.RedirectServlet.class, false);
+              FileChecksumServlets.RedirectServlet.class);
           httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
-              ContentSummaryServlet.class, false);
+              ContentSummaryServlet.class);
           httpServer.start();
-      
+
           // The web-server port can be ephemeral... ensure we have the correct info
           infoPort = httpServer.getPort();
           httpAddress = new InetSocketAddress(infoHost, infoPort);
@@ -459,8 +527,7 @@ public class NameNode implements ClientP
     } catch (InterruptedException e) {
       throw new IOException(e);
     } finally {
-      if(UserGroupInformation.isSecurityEnabled() && 
-          conf.get(DFSConfigKeys.DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY) != null) {
+      if (SecurityUtil.useKsslAuth()) {
         // Go back to being the correct Namenode principal
         LOG.info("Logging back in as "
             + SecurityUtil.getServerPrincipal(conf
@@ -471,7 +538,7 @@ public class NameNode implements ClientP
                 .getHostName());
       }
     }
- }
+  }
 
   /**
    * Start NameNode.
@@ -481,6 +548,8 @@ public class NameNode implements ClientP
    * <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
    * <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
+   * <li>{@link StartupOption#RECOVER RECOVER} - recover name node
+   * metadata</li>
    * upgrade and create a snapshot of the current file system state</li> 
    * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the  
    *            cluster back to the previous state</li>
@@ -522,6 +591,15 @@ public class NameNode implements ClientP
     if (stopRequested)
       return;
     stopRequested = true;
+    if (plugins != null) {
+      for (ServicePlugin p : plugins) {
+        try {
+          p.stop();
+        } catch (Throwable t) {
+          LOG.warn("ServicePlugin " + p + " could not be stopped", t);
+        }
+      }
+    }
     try {
       if (httpServer != null) httpServer.stop();
     } catch (Exception e) {
@@ -622,7 +700,7 @@ public class NameNode implements ClientP
                              ) throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.create: file "
+      stateChangeLog.debug("*DIR* NameNode.create: "
                          +src+" for "+clientName+" at "+clientMachine);
     }
     if (!checkPathLength(src)) {
@@ -641,7 +719,7 @@ public class NameNode implements ClientP
   public LocatedBlock append(String src, String clientName) throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.append: file "
+      stateChangeLog.debug("*DIR* NameNode.append: "
           +src+" for "+clientName+" at "+clientMachine);
     }
     LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
@@ -650,6 +728,11 @@ public class NameNode implements ClientP
   }
 
   /** {@inheritDoc} */
+  public boolean isFileClosed(String src) throws IOException {
+    return namesystem.isFileClosed(src);
+  }
+
+  /** {@inheritDoc} */
   public boolean recoverLease(String src, String clientName) throws IOException {
     String clientMachine = getClientMachine();
     return namesystem.recoverLease(src, clientName, clientMachine);
@@ -686,6 +769,7 @@ public class NameNode implements ClientP
                                String clientName,
                                DatanodeInfo[] excludedNodes)
     throws IOException {
+
     HashMap<Node, Node> excludedNodesSet = null;
     if (excludedNodes != null) {
       excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
@@ -694,7 +778,7 @@ public class NameNode implements ClientP
       }
     }
 
-    stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+    stateChangeLog.debug("*BLOCK* NameNode.addBlock: "
                          +src+" for "+clientName);
     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(
       src, clientName, excludedNodesSet);
@@ -709,7 +793,7 @@ public class NameNode implements ClientP
   public void abandonBlock(Block b, String src, String holder
       ) throws IOException {
     stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
-                         +b+" of file "+src);
+                         +b+" of "+src);
     if (!namesystem.abandonBlock(b, src, holder)) {
       throw new IOException("Cannot abandon block during write to " + src);
     }
@@ -724,7 +808,7 @@ public class NameNode implements ClientP
     } else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
       return true;
     } else {
-      throw new IOException("Could not complete write to file " + src + " by " + clientName);
+      throw new IOException("Could not complete write to " + src + " by " + clientName);
     }
   }
 
@@ -763,6 +847,13 @@ public class NameNode implements ClientP
   public long getPreferredBlockSize(String filename) throws IOException {
     return namesystem.getPreferredBlockSize(filename);
   }
+  
+  /** 
+   * {@inheritDoc}
+   */
+  public void concat(String trg, String[] src) throws IOException {
+    namesystem.concat(trg, src);
+  }
     
   /**
    */
@@ -1143,8 +1234,7 @@ public class NameNode implements ClientP
    * @throws IOException
    */
   private static boolean format(Configuration conf,
-                                boolean isConfirmationNeeded
-                                ) throws IOException {
+      boolean isConfirmationNeeded, boolean isInteractive) throws IOException {
     Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
     Collection<File> editDirsToFormat = 
                  FSNamesystem.getNamespaceEditsDirs(conf);
@@ -1153,6 +1243,10 @@ public class NameNode implements ClientP
       if (!curDir.exists())
         continue;
       if (isConfirmationNeeded) {
+        if (!isInteractive) {
+          System.err.println("Format aborted: " + curDir + " exists.");
+          return true;
+        }
         System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
         if (!(System.in.read() == 'Y')) {
           System.err.println("Format aborted in "+ curDir);
@@ -1219,11 +1313,14 @@ public class NameNode implements ClientP
   private static void printUsage() {
     System.err.println(
       "Usage: java NameNode [" +
-      StartupOption.FORMAT.getName() + "] | [" +
+      StartupOption.FORMAT.getName()  + " [" + StartupOption.FORCE.getName() +  
+      " ] ["+StartupOption.NONINTERACTIVE.getName()+"]] | [" +
       StartupOption.UPGRADE.getName() + "] | [" +
       StartupOption.ROLLBACK.getName() + "] | [" +
       StartupOption.FINALIZE.getName() + "] | [" +
-      StartupOption.IMPORT.getName() + "]");
+      StartupOption.IMPORT.getName() + "] | [" + 
+      StartupOption.RECOVER.getName() +
+        " [ " + StartupOption.FORCE.getName() + " ] ]");
   }
 
   private static StartupOption parseArguments(String args[]) {
@@ -1233,10 +1330,34 @@ public class NameNode implements ClientP
       String cmd = args[i];
       if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.FORMAT;
+        // check if there are other options
+        for (i = i + 1; i < argsLen; i++) {
+          if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
+            startOpt.setConfirmationNeeded(false);
+          }
+          if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
+            startOpt.setInteractive(false);
+          }
+        }
       } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.REGULAR;
       } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.UPGRADE;
+      } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
+        if (startOpt != StartupOption.REGULAR) {
+          throw new RuntimeException("Can't combine -recover with " +
+              "other startup options.");
+        }
+        startOpt = StartupOption.RECOVER;
+        while (++i < argsLen) {
+          if (args[i].equalsIgnoreCase(
+                StartupOption.FORCE.getName())) {
+            startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
+          } else {
+            throw new RuntimeException("Error parsing recovery options: " + 
+              "can't understand option \"" + args[i] + "\"");
+          }
+        }
       } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.ROLLBACK;
       } else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
@@ -1258,6 +1379,78 @@ public class NameNode implements ClientP
                                           StartupOption.REGULAR.toString()));
   }
 
+  private static void doRecovery(StartupOption startOpt, Configuration conf)
+              throws IOException {
+    if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
+      if (!confirmPrompt("You have selected Metadata Recovery mode.  " +
+          "This mode is intended to recover lost metadata on a corrupt " +
+          "filesystem.  Metadata recovery mode often permanently deletes " +
+          "data from your HDFS filesystem.  Please back up your edit log " +
+          "and image before trying this!\n\n" +
+          "Are you ready to proceed? (Y/N)\n")) {
+        System.err.println("Recovery aborted at user request.\n");
+        return;
+      }
+    }
+    final int tolerationLength = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_KEY,
+        DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_DEFAULT);
+    if (tolerationLength >= 0) {
+      if (!confirmPrompt("You have selected Metadata Recovery mode and have set "
+          + DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_KEY + " = "
+          + tolerationLength + ".  However, Metadata Recovery mode and the"
+          + " Edit Log Toleration feature cannot be enabled at the same time."
+          + "  Disable Edit Log Toleration? (Y/N)\n")) {
+        System.err.println("Recovery aborted at user request.\n");
+        return;
+      }
+      conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_KEY, -1);
+    }
+
+    MetaRecoveryContext.LOG.info("starting recovery...");
+    Collection<File> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
+    Collection<File> editDirs = 
+                 FSNamesystem.getNamespaceEditsDirs(conf);
+    FSNamesystem fsn = null;
+    try {
+      fsn = new FSNamesystem(new FSImage(namespaceDirs, editDirs), conf);
+      fsn.dir.fsImage.loadFSImage(startOpt.createRecoveryContext());
+      fsn.dir.fsImage.saveNamespace(true);
+      MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
+    } finally {
+      if (fsn != null)
+        fsn.close();
+    }
+  }
+  
+  /**
+   * Print out a prompt to the user, and return true if the user
+   * responds with "Y" or "yes".
+   */
+  static boolean confirmPrompt(String prompt) throws IOException {
+    while (true) {
+      System.err.print(prompt + " (Y or N) ");
+      StringBuilder responseBuilder = new StringBuilder();
+      while (true) {
+        int c = System.in.read();
+        if (c == -1 || c == '\r' || c == '\n') {
+          break;
+        }
+        responseBuilder.append((char)c);
+      }
+
+      String response = responseBuilder.toString();
+      if (response.equalsIgnoreCase("y") ||
+          response.equalsIgnoreCase("yes")) {
+        return true;
+      } else if (response.equalsIgnoreCase("n") ||
+          response.equalsIgnoreCase("no")) {
+        return false;
+      }
+      // else ask them again
+    }
+  }
+
   public static NameNode createNameNode(String argv[], 
                                  Configuration conf) throws IOException {
     if (conf == null)
@@ -1265,17 +1458,21 @@ public class NameNode implements ClientP
     StartupOption startOpt = parseArguments(argv);
     if (startOpt == null) {
       printUsage();
-      return null;
+      System.exit(-2);
     }
     setStartupOption(conf, startOpt);
 
     switch (startOpt) {
       case FORMAT:
-        boolean aborted = format(conf, true);
+        boolean aborted = format(conf, startOpt.getConfirmationNeeded(),
+            startOpt.getInteractive());
         System.exit(aborted ? 1 : 0);
       case FINALIZE:
         aborted = finalize(conf, true);
         System.exit(aborted ? 1 : 0);
+      case RECOVER:
+        NameNode.doRecovery(startOpt, conf);
+        return null;
       default:
     }
     DefaultMetricsSystem.initialize("NameNode");

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Jun 21 06:37:27 2013
@@ -56,17 +56,8 @@ import org.apache.hadoop.security.UserGr
  *  root path. The following abnormal conditions are detected and handled:</p>
  * <ul>
  * <li>files with blocks that are completely missing from all datanodes.<br/>
- * In this case the tool can perform one of the following actions:
- *  <ul>
- *      <li>none ({@link #FIXING_NONE})</li>
- *      <li>move corrupted files to /lost+found directory on DFS
- *      ({@link #FIXING_MOVE}). Remaining data blocks are saved as a
- *      block chains, representing longest consecutive series of valid blocks.</li>
- *      <li>delete corrupted files ({@link #FIXING_DELETE})</li>
- *  </ul>
- *  </li>
- *  <li>detect files with under-replicated or over-replicated blocks</li>
- *  </ul>
+ * <li>files with under-replicated or over-replicated blocks</li>
+ * </ul>
  *  Additionally, the tool collects a detailed overall DFS statistics, and
  *  optionally can print detailed statistics on block locations and replication
  *  factors of each file.
@@ -80,13 +71,6 @@ public class NamenodeFsck {
   public static final String NONEXISTENT_STATUS = "does not exist";
   public static final String FAILURE_STATUS = "FAILED";
   
-  /** Don't attempt any fixing . */
-  public static final int FIXING_NONE = 0;
-  /** Move corrupted files to /lost+found . */
-  public static final int FIXING_MOVE = 1;
-  /** Delete corrupted files. */
-  public static final int FIXING_DELETE = 2;
-  
   private final NameNode namenode;
   private final NetworkTopology networktopology;
   private final int totalDatanodes;
@@ -101,7 +85,21 @@ public class NamenodeFsck {
   private boolean showBlocks = false;
   private boolean showLocations = false;
   private boolean showRacks = false;
-  private int fixing = FIXING_NONE;
+
+  /** 
+   * True if the user specified the -move option.
+   *
+   * Whe this option is in effect, we will copy salvaged blocks into the lost
+   * and found. */
+  private boolean doMove = false;
+
+  /** 
+   * True if the user specified the -delete option.
+   *
+   * Whe this option is in effect, we will delete corrupted files.
+   */
+  private boolean doDelete = false;
+
   private String path = "/";
   
   private final Configuration conf;
@@ -133,8 +131,8 @@ public class NamenodeFsck {
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       if (key.equals("path")) { this.path = pmap.get("path")[0]; }
-      else if (key.equals("move")) { this.fixing = FIXING_MOVE; }
-      else if (key.equals("delete")) { this.fixing = FIXING_DELETE; }
+      else if (key.equals("move")) { this.doMove = true; }
+      else if (key.equals("delete")) { this.doDelete = true; }
       else if (key.equals("files")) { this.showFiles = true; }
       else if (key.equals("blocks")) { this.showBlocks = true; }
       else if (key.equals("locations")) { this.showLocations = true; }
@@ -219,7 +217,7 @@ public class NamenodeFsck {
     // Get block locations without updating the file access time 
     // and without block access tokens
     LocatedBlocks blocks = namenode.getNamesystem().getBlockLocations(path, 0,
-        fileLen, false, false);
+        fileLen, false, false, false);
     if (blocks == null) { // the file is deleted
       return;
     }
@@ -284,7 +282,7 @@ public class NamenodeFsck {
       }
       // verify block placement policy
       int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
-                          verifyBlockPlacement(path, lBlk, targetFileReplication);
+                          verifyBlockPlacement(path, lBlk, (short)Math.min(2,targetFileReplication));
       if (missingRacks > 0) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;
@@ -328,16 +326,20 @@ public class NamenodeFsck {
             + " blocks of total size " + missize + " B.");
       }
       res.corruptFiles++;
-      switch(fixing) {
-      case FIXING_NONE:
-        break;
-      case FIXING_MOVE:
-        if (!isOpen)
-          lostFoundMove(parent, file, blocks);
-        break;
-      case FIXING_DELETE:
-        if (!isOpen)
-          namenode.delete(path, true);
+      try {
+        if (doMove) {
+          if (!isOpen) {
+            copyBlocksToLostFound(parent, file, blocks);
+          }
+        }
+        if (doDelete) {
+          if (!isOpen) {
+            LOG.warn("\n - deleting corrupted file " + path);
+            namenode.delete(path, true);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("error processing " + path + ": " + e.toString());
       }
     }
     if (showFiles) {
@@ -352,8 +354,8 @@ public class NamenodeFsck {
     }
   }
   
-  private void lostFoundMove(String parent, HdfsFileStatus file, LocatedBlocks blocks)
-    throws IOException {
+  private void copyBlocksToLostFound(String parent, HdfsFileStatus file,
+        LocatedBlocks blocks) throws IOException {
     final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
     try {
     if (!lfInited) {
@@ -386,12 +388,10 @@ public class NamenodeFsck {
         }
         if (fos == null) {
           fos = dfs.create(target + "/" + chain, true);
-          if (fos != null) chain++;
+          if (fos != null)
+            chain++;
           else {
-            LOG.warn(errmsg + ": could not store chain " + chain);
-            // perhaps we should bail out here...
-            // return;
-            continue;
+            throw new IOException(errmsg + ": could not store chain " + chain);
           }
         }
         
@@ -408,8 +408,7 @@ public class NamenodeFsck {
         }
       }
       if (fos != null) fos.close();
-      LOG.warn("\n - moved corrupted file " + fullName + " to /lost+found");
-      dfs.delete(fullName, true);
+      LOG.warn("\n - copied corrupted file " + fullName + " to /lost+found");
     }  catch (Exception e) {
       e.printStackTrace();
       LOG.warn(errmsg + ": " + e.getMessage());

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Fri Jun 21 06:37:27 2013
@@ -68,7 +68,7 @@ class PendingReplicationBlocks {
   /**
    * Add a block to the list of pending Replications
    */
-  void add(Block block, int numReplicas) {
+  void increment(Block block, int numReplicas) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found == null) {
@@ -85,7 +85,7 @@ class PendingReplicationBlocks {
    * Decrement the number of pending replication requests
    * for this block.
    */
-  void remove(Block block) {
+  void decrement(Block block) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
@@ -97,6 +97,17 @@ class PendingReplicationBlocks {
       }
     }
   }
+  
+  /**
+   * Remove the record about the given block from pendingReplications.
+   * @param block The given block whose pending replication requests need to be
+   *              removed
+   */
+  void remove(Block block) {
+    synchronized (pendingReplications) {
+      pendingReplications.remove(block);
+    }
+  }
 
   /**
    * The total number of blocks that are undergoing replication

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PermissionChecker.java Fri Jun 21 06:37:27 2013
@@ -98,7 +98,6 @@ class PermissionChecker {
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Jun 21 06:37:27 2013
@@ -17,27 +17,29 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.protocol.FSConstants.BUFFER_SIZE;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import static org.apache.hadoop.hdfs.protocol.FSConstants.BUFFER_SIZE;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.MD5Hash;
@@ -48,6 +50,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 
@@ -73,6 +76,9 @@ public class SecondaryNameNode implement
   public static final Log LOG = 
     LogFactory.getLog(SecondaryNameNode.class.getName());
 
+  private final long starttime = System.currentTimeMillis();
+  private volatile long lastCheckpointTime = 0;
+
   private String fsName;
   private CheckpointStorage checkpointImage;
 
@@ -88,8 +94,20 @@ public class SecondaryNameNode implement
   private Collection<File> checkpointDirs;
   private Collection<File> checkpointEditsDirs;
   private long checkpointPeriod;	// in seconds
-  private long checkpointSize;    // size (in MB) of current Edit Log
+  private long checkpointSize;    // size (in bytes) of current Edit Log
 
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + " Status" 
+      + "\nName Node Address    : " + nameNodeAddr   
+      + "\nStart Time           : " + new Date(starttime)
+      + "\nLast Checkpoint Time : " + (lastCheckpointTime == 0? "--": new Date(lastCheckpointTime))
+      + "\nCheckpoint Period    : " + checkpointPeriod + " seconds"
+      + "\nCheckpoint Size      : " + StringUtils.byteDesc(checkpointSize)
+                                    + " (= " + checkpointSize + " bytes)" 
+      + "\nCheckpoint Dirs      : " + checkpointDirs
+      + "\nCheckpoint Edits Dirs: " + checkpointEditsDirs;
+  }
   /**
    * Utility class to facilitate junit test error simulation.
    */
@@ -194,6 +212,74 @@ public class SecondaryNameNode implement
     checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
 
     // initialize the webserver for uploading files.
+    if (SecurityUtil.useKsslAuth()) {
+      initializeKsslWebServer(infoSocAddr);
+    } else {
+      initializeHttpWebServer(infoSocAddr);
+    }
+
+    LOG.info("Web server init done");
+    // The web-server port can be ephemeral... ensure we have the correct info
+    
+    infoPort = infoServer.getPort();
+    if (!SecurityUtil.useKsslAuth()) {
+      imagePort = infoPort;
+    }
+    conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort); 
+    LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
+    LOG.warn("Checkpoint Period   :" + checkpointPeriod + " secs " +
+             "(" + checkpointPeriod/60 + " min)");
+    LOG.warn("Log Size Trigger    :" + checkpointSize + " bytes " +
+             "(" + checkpointSize/1024 + " KB)");
+  }
+
+  private void initializeHttpWebServer(final InetSocketAddress infoSocAddr)
+      throws IOException {
+    int tmpInfoPort = infoSocAddr.getPort();
+    infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
+                                tmpInfoPort == 0, conf,
+                                SecurityUtil.getAdminAcls
+                                  (conf, DFSConfigKeys.DFS_ADMIN)) {
+        {
+          if (UserGroupInformation.isSecurityEnabled()) {
+            // Security is enabled, so use SPNEGO to authenticate.
+            Map<String, String> params = new HashMap<String, String>();
+            String principalInConf = 
+              conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY);
+            if (principalInConf != null && !principalInConf.isEmpty()) {
+              params.put("kerberos.principal",
+                         SecurityUtil.getServerPrincipal
+                           (principalInConf, infoSocAddr.getHostName()));
+            }
+            String httpKeytab = conf.get(
+                DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
+            if (null == httpKeytab) {
+              httpKeytab = conf.get(
+                  DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
+            }
+            if (httpKeytab != null && !httpKeytab.isEmpty()) {
+              params.put("kerberos.keytab", httpKeytab);
+            }
+
+            params.put(AuthenticationFilter.AUTH_TYPE, "kerberos");
+
+            defineFilter(webAppContext, SPNEGO_FILTER, 
+                         AuthenticationFilter.class.getName(),
+                         params, null);
+          }
+        }
+      };
+
+    infoServer.setAttribute("secondary.name.node", this);
+    infoServer.setAttribute("name.system.image", checkpointImage);
+    infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+    infoServer.addInternalServlet("getimage", "/getimage",
+        GetImageServlet.class, true, false);
+    infoServer.start();
+  }
+
+  private void initializeKsslWebServer(final InetSocketAddress infoSocAddr)
+      throws IOException {
     // Kerberized SSL servers must be run from the host principal...
     UserGroupInformation httpUGI = 
       UserGroupInformation.loginUserFromKeytabAndReturnUGI(
@@ -214,20 +300,18 @@ public class SecondaryNameNode implement
               tmpInfoPort == 0, conf, 
               SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN));
           
-          if(UserGroupInformation.isSecurityEnabled()) {
-            System.setProperty("https.cipherSuites", 
-                Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
-            InetSocketAddress secInfoSocAddr = 
-              NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.get(
-                "dfs.secondary.https.port", infoBindAddress + ":" + 0));
-            imagePort = secInfoSocAddr.getPort();
-            infoServer.addSslListener(secInfoSocAddr, conf, false, true);
-          }
-          
+          System.setProperty("https.cipherSuites", 
+              Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
+          InetSocketAddress secInfoSocAddr = 
+            NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.getInt(
+              "dfs.secondary.https.port", 50490));
+          imagePort = secInfoSocAddr.getPort();
+          infoServer.addSslListener(secInfoSocAddr, conf, false, true);
+  
           infoServer.setAttribute("name.system.image", checkpointImage);
           infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
           infoServer.addInternalServlet("getimage", "/getimage",
-              GetImageServlet.class, true);
+              GetImageServlet.class, true, true);
           infoServer.start();
           return infoServer;
         }
@@ -235,20 +319,6 @@ public class SecondaryNameNode implement
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    LOG.info("Web server init done");
-    // The web-server port can be ephemeral... ensure we have the correct info
-    
-    infoPort = infoServer.getPort();
-    if(!UserGroupInformation.isSecurityEnabled())
-      imagePort = infoPort;
-    
-    conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort); 
-    LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
-    LOG.info("Secondary image servlet up at: " + infoBindAddress + ":" + imagePort);
-    LOG.warn("Checkpoint Period   :" + checkpointPeriod + " secs " +
-             "(" + checkpointPeriod/60 + " min)");
-    LOG.warn("Log Size Trigger    :" + checkpointSize + " bytes " +
-             "(" + checkpointSize/1024 + " KB)");
   }
 
   /**
@@ -300,7 +370,6 @@ public class SecondaryNameNode implement
     // pending edit log.
     //
     long period = 5 * 60;              // 5 minutes
-    long lastCheckpointTime = 0;
     if (checkpointPeriod < period) {
       period = checkpointPeriod;
     }
@@ -709,7 +778,7 @@ public class SecondaryNameNode implement
       if ((sdName == null) || (sdEdits == null))
         throw new IOException("Could not locate checkpoint directories");
       loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
-      loadFSEdits(sdEdits);
+      loadFSEdits(sdEdits, null);
       sig.validateStorageInfo(this);
       saveNamespace(false);
     }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Fri Jun 21 06:37:27 2013
@@ -30,9 +30,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.ErrorSimulator;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * This class provides fetching a specified file from the NameNode.
@@ -125,7 +125,8 @@ class TransferFsImage implements FSConst
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
    */
-  static void getFileServer(OutputStream outstream, File localfile) 
+  static void getFileServer(OutputStream outstream, File localfile,
+      DataTransferThrottler throttler) 
     throws IOException {
     byte buf[] = new byte[BUFFER_SIZE];
     FileInputStream infile = null;
@@ -144,6 +145,9 @@ class TransferFsImage implements FSConst
           break;
         }
         outstream.write(buf, 0, num);
+        if (throttler != null) {
+          throttler.throttle(num);
+        }
       }
     } finally {
       if (infile != null) {
@@ -161,19 +165,15 @@ class TransferFsImage implements FSConst
   static MD5Hash getFileClient(String fsName, String id, File[] localPath,
       boolean getChecksum) throws IOException {
     byte[] buf = new byte[BUFFER_SIZE];
-    String proto = UserGroupInformation.isSecurityEnabled() ? "https://" : "http://";
-    
-    StringBuffer str = new StringBuffer(proto+fsName+"/getimage?");
-    str.append(id);
 
+    String str = NameNode.getHttpUriScheme() + "://" + fsName + "/getimage?" + id;
+    LOG.info("Opening connection to " + str);
     //
     // open connection to remote server
     //
-    URL url = new URL(str.toString());
-    
-    // Avoid Krb bug with cross-realm hosts
-    SecurityUtil.fetchServiceTicket(url);
-    URLConnection connection = url.openConnection();
+    URL url = new URL(str);
+
+    URLConnection connection = SecurityUtil.openSecureHttpConnection(url);
     InputStream stream = connection.getInputStream();
     MessageDigest digester = null;
     if (getChecksum) {

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Fri Jun 21 06:37:27 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.B
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
   static final int LEVEL = 3;
+  static public final int QUEUE_WITH_CORRUPT_BLOCKS = 2;
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
       
   /* constructor */
@@ -187,39 +188,55 @@ class UnderReplicatedBlocks implements I
     return new BlockIterator();
   }
   
-    class BlockIterator implements Iterator<Block> {
-      private int level;
-      private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
-      BlockIterator()  
-      {
-        level=0;
-        for(int i=0; i<LEVEL; i++) {
-          iterators.add(priorityQueues.get(i).iterator());
-        }
-      }
-              
-      private void update() {
-        while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
-          level++;
-        }
-      }
-              
-      public Block next() {
-        update();
-        return iterators.get(level).next();
-      }
-              
-      public boolean hasNext() {
-        update();
-        return iterators.get(level).hasNext();
+  /* returns an iterator of all blocks in a given priority queue */
+  private synchronized Iterable<Block> getQueue(int priority) {
+    if (priority < 0 || priority >= LEVEL) {
+      return null;
+    }
+    return priorityQueues.get(priority);
+  }
+  
+  /**
+   * @return an iterator of all the blocks in the QUEUE_WITH_CORRUPT_BLOCKS
+   *         priority queue
+   */
+  Iterable<Block> getCorruptQueue() {
+    return getQueue(QUEUE_WITH_CORRUPT_BLOCKS);
+  }
+  
+  class BlockIterator implements Iterator<Block> {
+    private int level;
+    private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
+
+    BlockIterator() {
+      level = 0;
+      for (int i = 0; i < LEVEL; i++) {
+        iterators.add(priorityQueues.get(i).iterator());
       }
-              
-      public void remove() {
-        iterators.get(level).remove();
+    }
+
+    private void update() {
+      while (level < LEVEL - 1 && !iterators.get(level).hasNext()) {
+        level++;
       }
-      
-      public int getPriority() {
-        return level;
+    }
+
+    public Block next() {
+      update();
+      return iterators.get(level).next();
+    }
+
+    public boolean hasNext() {
+      update();
+      return iterators.get(level).hasNext();
+    }
+
+    public void remove() {
+      iterators.get(level).remove();
+    }
+
+    public int getPriority() {
+      return level;
     };
   }
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Fri Jun 21 06:37:27 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.web.JsonUtil;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
@@ -107,6 +109,11 @@ public class NamenodeWebHdfsMethods {
     return REMOTE_ADDRESS.get();
   }
 
+  /** Set the remote client address. */
+  static void setRemoteAddress(String remoteAddress) {
+    REMOTE_ADDRESS.set(remoteAddress);
+  }
+
   private @Context ServletContext context;
   private @Context HttpServletRequest request;
   private @Context HttpServletResponse response;
@@ -126,12 +133,21 @@ public class NamenodeWebHdfsMethods {
     response.setContentType(null);
   }
 
-  private static DatanodeInfo chooseDatanode(final NameNode namenode,
-      final String path, final HttpOpParam.Op op, final long openOffset
-      ) throws IOException {
-    if (op == GetOpParam.Op.OPEN
+  static DatanodeInfo chooseDatanode(final NameNode namenode,
+      final String path, final HttpOpParam.Op op, final long openOffset,
+      final long blocksize) throws IOException {
+    final FSNamesystem ns = namenode.getNamesystem();
+
+    if (op == PutOpParam.Op.CREATE) {
+      //choose a datanode near to client
+      final DatanodeInfo dn = ns.chooseDatanode(path, getRemoteAddress(), blocksize);
+      if (dn != null) {
+        return dn;
+      }
+    } else if (op == GetOpParam.Op.OPEN
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == PostOpParam.Op.APPEND) {
+      //choose a datanode containing a replica
       final HdfsFileStatus status = namenode.getFileInfo(path);
       if (status == null) {
         throw new FileNotFoundException("File " + path + " not found.");
@@ -155,7 +171,7 @@ public class NamenodeWebHdfsMethods {
       }
     } 
 
-    return namenode.getNamesystem().getRandomDatanode();
+    return ns.getRandomDatanode();
   }
 
   private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -173,8 +189,10 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final String path, final HttpOpParam.Op op, final long openOffset,
+      final long blocksize,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
-    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
+    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
+        blocksize);
 
     final String delegationQuery;
     if (!UserGroupInformation.isSecurityEnabled()) {
@@ -302,7 +320,7 @@ public class NamenodeWebHdfsMethods {
     case CREATE:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L,
+          fullpath, op.getValue(), -1L, blockSize.getValue(conf),
           permission, overwrite, bufferSize, replication, blockSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     } 
@@ -384,10 +402,12 @@ public class NamenodeWebHdfsMethods {
           final DoAsParam doAsUser,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
+      @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
+          final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return post(ugi, delegation, username, doAsUser, ROOT, op, bufferSize);
+    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
   }
 
   /** Handle HTTP POST request. */
@@ -406,11 +426,13 @@ public class NamenodeWebHdfsMethods {
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
+      @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
+          final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, username, doAsUser, path, op, bufferSize);
+    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -425,9 +447,14 @@ public class NamenodeWebHdfsMethods {
     case APPEND:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, bufferSize);
+          fullpath, op.getValue(), -1L, -1L, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
+    case CONCAT:
+    {
+      namenode.concat(fullpath, concatSrcs.getAbsolutePaths());
+      return Response.ok().build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }
@@ -507,7 +534,7 @@ public class NamenodeWebHdfsMethods {
     case OPEN:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
+          fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GET_BLOCK_LOCATIONS:
@@ -543,7 +570,7 @@ public class NamenodeWebHdfsMethods {
     case GETFILECHECKSUM:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L);
+          fullpath, op.getValue(), -1L, -1L);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETDELEGATIONTOKEN:

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Fri Jun 21 06:37:27 2013
@@ -41,12 +41,9 @@ import org.apache.hadoop.security.Kerber
     clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 25: Serialized format of BlockTokenIdentifier changed to contain
-   *     multiple blocks within a single BlockTokenIdentifier
-   *     
-   *     (bumped to 25 to bring in line with trunk)
+   * 26: Added an additional member to NamespaceInfo
    */
-  public static final long versionID = 25L;
+  public static final long versionID = 26L;
   
   // error code
   final static int NOTIFY = 0;

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java Fri Jun 21 06:37:27 2013
@@ -23,12 +23,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.util.VersionInfo;
 
 /**
  * NamespaceInfo is returned by the name-node in reply 
@@ -36,22 +36,27 @@ import org.apache.hadoop.io.WritableFact
  * 
  */
 public class NamespaceInfo extends StorageInfo implements Writable {
-  String  buildVersion;
+  String revision;
+  String version;
   int distributedUpgradeVersion;
 
   public NamespaceInfo() {
     super();
-    buildVersion = null;
   }
   
   public NamespaceInfo(int nsID, long cT, int duVersion) {
-    super(FSConstants.LAYOUT_VERSION, nsID, cT);
-    buildVersion = Storage.getBuildVersion();
+    super(FSConstants.LAYOUT_VERSION, nsID, cT); 
+    version = VersionInfo.getVersion();
+    revision = VersionInfo.getRevision();
     this.distributedUpgradeVersion = duVersion;
   }
   
-  public String getBuildVersion() {
-    return buildVersion;
+  public String getVersion() {
+    return version;
+  }
+
+  public String getRevision() {
+    return revision;
   }
 
   public int getDistributedUpgradeVersion() {
@@ -70,7 +75,8 @@ public class NamespaceInfo extends Stora
   }
 
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, getBuildVersion());
+    UTF8.writeString(out, getVersion());
+    UTF8.writeString(out, getRevision());
     out.writeInt(getLayoutVersion());
     out.writeInt(getNamespaceID());
     out.writeLong(getCTime());
@@ -78,7 +84,8 @@ public class NamespaceInfo extends Stora
   }
 
   public void readFields(DataInput in) throws IOException {
-    buildVersion = UTF8.readString(in);
+    version = UTF8.readString(in);
+    revision = UTF8.readString(in);
     layoutVersion = in.readInt();
     namespaceID = in.readInt();
     cTime = in.readLong();