You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/09/27 00:55:20 UTC

svn commit: r1390763 [3/4] - in /hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/servlet/ hadoop-hdfs-httpfs/src/site/apt/ hadoop-hdfs-httpfs/src/test/java/org/apac...

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 26 22:55:00 2012
@@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -1071,7 +1072,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "setPermission", src, null, null);
       }
       throw e;
@@ -1100,7 +1101,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "setPermission", src, null, resultingStat);
     }
   }
@@ -1117,7 +1118,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "setOwner", src, null, null);
       }
       throw e;
@@ -1155,7 +1156,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "setOwner", src, null, resultingStat);
     }
   }
@@ -1172,6 +1173,14 @@ public class FSNamesystem implements Nam
     if (blocks != null) {
       blockManager.getDatanodeManager().sortLocatedBlocks(
           clientMachine, blocks.getLocatedBlocks());
+      
+      LocatedBlock lastBlock = blocks.getLastLocatedBlock();
+      if (lastBlock != null) {
+        ArrayList<LocatedBlock> lastBlockList = new ArrayList<LocatedBlock>();
+        lastBlockList.add(lastBlock);
+        blockManager.getDatanodeManager().sortLocatedBlocks(
+                              clientMachine, lastBlockList);
+      }
     }
     return blocks;
   }
@@ -1190,7 +1199,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "open", src, null, null);
       }
       throw e;
@@ -1216,7 +1225,7 @@ public class FSNamesystem implements Nam
         offset, length, doAccessTime, needBlockToken);  
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "open", src, null, null);
     }
     if (checkSafeMode && isInSafeMode()) {
@@ -1301,7 +1310,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getLoginUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "concat", Arrays.toString(srcs), target, null);
       }
       throw e;
@@ -1351,7 +1360,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getLoginUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "concat", Arrays.toString(srcs), target, resultingStat);
     }
   }
@@ -1468,7 +1477,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "setTimes", src, null, null);
       }
       throw e;
@@ -1495,7 +1504,7 @@ public class FSNamesystem implements Nam
         if (auditLog.isInfoEnabled() && isExternalInvocation()) {
           final HdfsFileStatus stat = dir.getFileInfo(src, false);
           logAuditEvent(UserGroupInformation.getCurrentUser(),
-                        Server.getRemoteIp(),
+                        getRemoteIp(),
                         "setTimes", src, null, stat);
         }
       } else {
@@ -1517,7 +1526,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "createSymlink", link, target, null);
       }
       throw e;
@@ -1545,7 +1554,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "createSymlink", link, target, resultingStat);
     }
   }
@@ -1601,7 +1610,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "setReplication", src, null, null);
       }
       throw e;
@@ -1637,7 +1646,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
     if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "setReplication", src, null, null);
     }
     return isFile;
@@ -1694,7 +1703,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "create", src, null, null);
       }
       throw e;
@@ -1719,7 +1728,7 @@ public class FSNamesystem implements Nam
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "create", src, null, stat);
     }
   }
@@ -2017,7 +2026,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "append", src, null, null);
       }
       throw e;
@@ -2055,7 +2064,7 @@ public class FSNamesystem implements Nam
     }
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "append", src, null, null);
     }
     return lb;
@@ -2521,7 +2530,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "rename", src, dst, null);
       }
       throw e;
@@ -2550,7 +2559,7 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "rename", src, dst, resultingStat);
     }
     return status;
@@ -2610,7 +2619,7 @@ public class FSNamesystem implements Nam
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
       }
-      logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
+      logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
                     cmd.toString(), src, dst, resultingStat);
     }
   }
@@ -2648,7 +2657,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "delete", src, null, null);
       }
       throw e;
@@ -2664,7 +2673,7 @@ public class FSNamesystem implements Nam
     boolean status = deleteInternal(src, recursive, true);
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "delete", src, null, null);
     }
     return status;
@@ -2802,8 +2811,11 @@ public class FSNamesystem implements Nam
    */
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
     throws AccessControlException, UnresolvedLinkException,
-           StandbyException {
+           StandbyException, IOException {
+    HdfsFileStatus stat = null;
+
     readLock();
+
     try {
       checkOperation(OperationCategory.READ);
 
@@ -2813,10 +2825,23 @@ public class FSNamesystem implements Nam
       if (isPermissionEnabled) {
         checkTraverse(src);
       }
-      return dir.getFileInfo(src, resolveLink);
+      stat = dir.getFileInfo(src, resolveLink);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      getRemoteIp(),
+                      "getfileinfo", src, null, null);
+      }
+      throw e;
     } finally {
       readUnlock();
     }
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    getRemoteIp(),
+                    "getfileinfo", src, null, null);
+    }
+    return stat;
   }
 
   /**
@@ -2829,7 +2854,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "mkdirs", src, null, null);
       }
       throw e;
@@ -2854,7 +2879,7 @@ public class FSNamesystem implements Nam
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
+                    getRemoteIp(),
                     "mkdirs", src, null, stat);
     }
     return status;
@@ -3295,7 +3320,7 @@ public class FSNamesystem implements Nam
     } catch (AccessControlException e) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "listStatus", src, null, null);
       }
       throw e;
@@ -3319,7 +3344,7 @@ public class FSNamesystem implements Nam
       }
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
+                      getRemoteIp(),
                       "listStatus", src, null, null);
       }
       dl = dir.getListing(src, startAfter, needLocation);
@@ -4398,6 +4423,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
+      checkSuperuserPrivilege();
       if (isInSafeMode()) {
         throw new SafeModeException("Log not rolled", safeMode);
       }
@@ -5250,7 +5276,15 @@ public class FSNamesystem implements Nam
    * RPC call context even if the client exits.
    */
   private boolean isExternalInvocation() {
-    return Server.isRpcInvocation();
+    return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
+  }
+
+  private static InetAddress getRemoteIp() {
+    InetAddress ip = Server.getRemoteIp();
+    if (ip != null) {
+      return ip;
+    }
+    return NamenodeWebHdfsMethods.getRemoteIp();
   }
   
   /**

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Wed Sep 26 22:55:00 2012
@@ -71,7 +71,7 @@ public class FileChecksumServlets {
         String tokenString = ugi.getTokens().iterator().next().encodeToUrlString();
         dtParam = JspHelper.getDelegationTokenUrlParam(tokenString);
       }
-      String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
+      String addr = nn.getNameNodeAddressHostPortString();
       String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
 
       return new URL(scheme, hostname, port, 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Wed Sep 26 22:55:00 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ServletUtil;
 
@@ -74,7 +73,7 @@ public class FileDataServlet extends Dfs
     // Add namenode address to the url params
     NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
         getServletContext());
-    String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
+    String addr = nn.getNameNodeAddressHostPortString();
     String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
     
     return new URL(scheme, hostname, port,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Sep 26 22:55:00 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,7 +36,6 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.HealthCheckFailedException;
 import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Trash;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@@ -73,6 +73,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.util.ExitUtil.ExitException;
+import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 
@@ -488,9 +489,9 @@ public class NameNode {
         LOG.warn("ServicePlugin " + p + " could not be started", t);
       }
     }
-    LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress());
+    LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
     if (rpcServer.getServiceRpcAddress() != null) {
-      LOG.info(getRole() + " service server is up at: "
+      LOG.info(getRole() + " service RPC up at: "
           + rpcServer.getServiceRpcAddress());
     }
   }
@@ -510,7 +511,7 @@ public class NameNode {
     stopHttpServer();
   }
   
-  private void startTrashEmptier(Configuration conf) throws IOException {
+  private void startTrashEmptier(final Configuration conf) throws IOException {
     long trashInterval =
         conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
     if (trashInterval == 0) {
@@ -519,7 +520,18 @@ public class NameNode {
       throw new IOException("Cannot start tresh emptier with negative interval."
           + " Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
     }
-    this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
+    
+    // This may be called from the transitionToActive code path, in which
+    // case the current user is the administrator, not the NN. The trash
+    // emptier needs to run as the NN. See HDFS-3972.
+    FileSystem fs = SecurityUtil.doAsLoginUser(
+        new PrivilegedExceptionAction<FileSystem>() {
+          @Override
+          public FileSystem run() throws IOException {
+            return FileSystem.get(conf);
+          }
+        });
+    this.emptier = new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier");
     this.emptier.setDaemon(true);
     this.emptier.start();
   }
@@ -616,7 +628,7 @@ public class NameNode {
    */
   public void join() {
     try {
-      this.rpcServer.join();
+      rpcServer.join();
     } catch (InterruptedException ie) {
       LOG.info("Caught interrupted exception ", ie);
     }
@@ -664,27 +676,31 @@ public class NameNode {
   }
 
   /**
-   * Returns the address on which the NameNodes is listening to.
-   * @return namenode rpc address
+   * @return NameNode RPC address
    */
   public InetSocketAddress getNameNodeAddress() {
     return rpcServer.getRpcAddress();
   }
-  
+
+  /**
+   * @return NameNode RPC address in "host:port" string form
+   */
+  public String getNameNodeAddressHostPortString() {
+    return NetUtils.getHostPortString(rpcServer.getRpcAddress());
+  }
+
   /**
-   * Returns namenode service rpc address, if set. Otherwise returns
-   * namenode rpc address.
-   * @return namenode service rpc address used by datanodes
+   * @return NameNode service RPC address if configured, the
+   *    NameNode RPC address otherwise
    */
   public InetSocketAddress getServiceRpcAddress() {
-    return rpcServer.getServiceRpcAddress() != null ? rpcServer.getServiceRpcAddress() : rpcServer.getRpcAddress();
+    final InetSocketAddress serviceAddr = rpcServer.getServiceRpcAddress();
+    return serviceAddr == null ? rpcServer.getRpcAddress() : serviceAddr;
   }
 
   /**
-   * Returns the address of the NameNodes http server, 
-   * which is used to access the name-node web UI.
-   * 
-   * @return the http address.
+   * @return NameNode HTTP address, used by the Web UI, image transfer,
+   *    and HTTP-based file system clients like Hftp and WebHDFS
    */
   public InetSocketAddress getHttpAddress() {
     return httpServer.getHttpAddress();
@@ -1056,6 +1072,10 @@ public class NameNode {
       throws IOException {
     if (conf == null)
       conf = new HdfsConfiguration();
+    // Parse out some generic args into Configuration.
+    GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
+    argv = hParser.getRemainingArgs();
+    // Parse the rest, NN specific args.
     StartupOption startOpt = parseArguments(argv);
     if (startOpt == null) {
       printUsage(System.err);
@@ -1154,10 +1174,12 @@ public class NameNode {
           NAMESERVICE_SPECIFIC_KEYS);
     }
     
+    // If the RPC address is set use it to (re-)configure the default FS
     if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
       URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
           + conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
       conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+      LOG.debug("Setting " + FS_DEFAULT_NAME_KEY + " to " + defaultUri.toString());
     }
   }
     
@@ -1179,8 +1201,9 @@ public class NameNode {
     try {
       StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
       NameNode namenode = createNameNode(argv, null);
-      if (namenode != null)
+      if (namenode != null) {
         namenode.join();
+      }
     } catch (Throwable e) {
       LOG.fatal("Exception in namenode join", e);
       terminate(1, e);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Wed Sep 26 22:55:00 2012
@@ -49,12 +49,9 @@ public class NameNodeHttpServer {
   private final Configuration conf;
   private final NameNode nn;
   
-  private final Log LOG = NameNode.LOG;
   private InetSocketAddress httpAddress;
-  
   private InetSocketAddress bindAddress;
   
-  
   public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
   public static final String FSIMAGE_ATTRIBUTE_KEY = "name.system.image";
   protected static final String NAMENODE_ATTRIBUTE_KEY = "name.node";
@@ -68,12 +65,6 @@ public class NameNodeHttpServer {
     this.bindAddress = bindAddress;
   }
   
-  private String getDefaultServerPrincipal() throws IOException {
-    return SecurityUtil.getServerPrincipal(
-        conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
-        nn.getNameNodeAddress().getHostName());
-  }
-
   public void start() throws IOException {
     final String infoHost = bindAddress.getHostName();
     int infoPort = bindAddress.getPort();
@@ -117,8 +108,11 @@ public class NameNodeHttpServer {
               SecurityUtil.getServerPrincipal(principalInConf,
                                               bindAddress.getHostName()));
         }
-        String httpKeytab = conf
-          .get(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
+        String httpKeytab = conf.get(
+          DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
+        if (httpKeytab == null) {
+          httpKeytab = conf.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
+        }
         if (httpKeytab != null && !httpKeytab.isEmpty()) {
           params.put(
             DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
@@ -146,7 +140,8 @@ public class NameNodeHttpServer {
         .getPort());
     }
     httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
-    httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY, nn.getNameNodeAddress());
+    httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY,
+        NetUtils.getConnectAddress(nn.getNameNodeAddress()));
     httpServer.setAttribute(FSIMAGE_ATTRIBUTE_KEY, nn.getFSImage());
     httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
     setupServlets(httpServer, conf);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Sep 26 22:55:00 2012
@@ -159,10 +159,11 @@ class NameNodeRpcServer implements Namen
     int handlerCount = 
       conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, 
                   DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
-    InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
-		RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
-         ProtobufRpcEngine.class);
-     ClientNamenodeProtocolServerSideTranslatorPB 
+
+    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    ClientNamenodeProtocolServerSideTranslatorPB 
        clientProtocolServerTranslator = 
          new ClientNamenodeProtocolServerSideTranslatorPB(this);
      BlockingService clientNNPbService = ClientNamenodeProtocol.
@@ -199,19 +200,24 @@ class NameNodeRpcServer implements Namen
         .newReflectiveBlockingService(haServiceProtocolXlator);
 	  
     WritableRpcEngine.ensureInitialized();
-    
-    InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
-    if (dnSocketAddr != null) {
+
+    InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
+    if (serviceRpcAddr != null) {
       int serviceHandlerCount =
         conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
+      serviceRpcServer = new RPC.Builder(conf)
+          .setProtocol(
+              org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
+          .setInstance(clientNNPbService)
+          .setBindAddress(serviceRpcAddr.getHostName())
+          .setPort(serviceRpcAddr.getPort())
+          .setNumHandlers(serviceHandlerCount)
+          .setVerbose(false)
+          .setSecretManager(namesystem.getDelegationTokenSecretManager())
+          .build();
+
       // Add all the RPC protocols that the namenode implements
-      this.serviceRpcServer = 
-          RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
-              ClientNamenodeProtocolPB.class, clientNNPbService,
-          dnSocketAddr.getHostName(), dnSocketAddr.getPort(), 
-          serviceHandlerCount,
-          false, conf, namesystem.getDelegationTokenSecretManager());
       DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
           serviceRpcServer);
       DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
@@ -225,18 +231,26 @@ class NameNodeRpcServer implements Namen
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
-      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
+      serviceRPCAddress = serviceRpcServer.getListenerAddress();
       nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
     } else {
       serviceRpcServer = null;
       serviceRPCAddress = null;
     }
+
+    InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
+    clientRpcServer = new RPC.Builder(conf)
+        .setProtocol(
+            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
+        .setInstance(clientNNPbService)
+        .setBindAddress(rpcAddr.getHostName())
+        .setPort(rpcAddr.getPort())
+        .setNumHandlers(handlerCount)
+        .setVerbose(false)
+        .setSecretManager(namesystem.getDelegationTokenSecretManager())
+        .build();
+
     // Add all the RPC protocols that the namenode implements
-    this.clientRpcServer = RPC.getServer(
-        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, 
-        clientNNPbService, socAddr.getHostName(),
-            socAddr.getPort(), handlerCount, false, conf,
-            namesystem.getDelegationTokenSecretManager());
     DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
         clientRpcServer);
     DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
@@ -254,44 +268,54 @@ class NameNodeRpcServer implements Namen
     if (serviceAuthEnabled =
           conf.getBoolean(
             CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
-      this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
-      if (this.serviceRpcServer != null) {
-        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      if (serviceRpcServer != null) {
+        serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
       }
     }
 
     // The rpc-server port can be ephemeral... ensure we have the correct info
-    this.clientRpcAddress = this.clientRpcServer.getListenerAddress(); 
+    clientRpcAddress = clientRpcServer.getListenerAddress();
     nn.setRpcServerAddress(conf, clientRpcAddress);
     
-    this.minimumDataNodeVersion = conf.get(
+    minimumDataNodeVersion = conf.get(
         DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
         DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
 
     // Set terse exception whose stack trace won't be logged
-    this.clientRpcServer.addTerseExceptions(SafeModeException.class);
+    clientRpcServer.addTerseExceptions(SafeModeException.class);
  }
   
   /**
-   * Actually start serving requests.
+   * Start client and service RPC servers.
    */
   void start() {
-    clientRpcServer.start();  //start RPC server
+    clientRpcServer.start();
     if (serviceRpcServer != null) {
       serviceRpcServer.start();      
     }
   }
   
   /**
-   * Wait until the RPC server has shut down.
+   * Wait until the RPC servers have shutdown.
    */
   void join() throws InterruptedException {
-    this.clientRpcServer.join();
+    clientRpcServer.join();
+    if (serviceRpcServer != null) {
+      serviceRpcServer.join();      
+    }
   }
-  
+
+  /**
+   * Stop client and service RPC servers.
+   */
   void stop() {
-    if(clientRpcServer != null) clientRpcServer.stop();
-    if(serviceRpcServer != null) serviceRpcServer.stop();
+    if (clientRpcServer != null) {
+      clientRpcServer.stop();
+    }
+    if (serviceRpcServer != null) {
+      serviceRpcServer.stop();
+    }
   }
   
   InetSocketAddress getServiceRpcAddress() {
@@ -328,8 +352,9 @@ class NameNodeRpcServer implements Namen
     namesystem.checkOperation(OperationCategory.UNCHECKED);
     verifyRequest(registration);
     LOG.info("Error report from " + registration + ": " + msg);
-    if(errorCode == FATAL)
+    if (errorCode == FATAL) {
       namesystem.releaseBackupNode(registration);
+    }
   }
 
   @Override // NamenodeProtocol
@@ -704,6 +729,13 @@ class NameNodeRpcServer implements Namen
     namesystem.checkOperation(OperationCategory.UNCHECKED);
     namesystem.saveNamespace();
   }
+  
+  @Override // ClientProtocol
+  public long rollEdits() throws AccessControlException, IOException {
+    namesystem.checkOperation(OperationCategory.JOURNAL);
+    CheckpointSignature sig = namesystem.rollEditLog();
+    return sig.getCurSegmentTxId();
+  }
 
   @Override // ClientProtocol
   public void refreshNodes() throws IOException {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Sep 26 22:55:00 2012
@@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -395,7 +394,7 @@ class NamenodeJspHelper {
       nodeToRedirect = nn.getHttpAddress().getHostName();
       redirectPort = nn.getHttpAddress().getPort();
     }
-    String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
+    String addr = nn.getNameNodeAddressHostPortString();
     String fqdn = InetAddress.getByName(nodeToRedirect).getCanonicalHostName();
     redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":" + redirectPort
         + "/browseDirectory.jsp?namenodeInfoPort="
@@ -566,8 +565,9 @@ class NamenodeJspHelper {
       final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       dm.fetchDatanodes(live, dead, true);
 
-      InetSocketAddress nnSocketAddress = (InetSocketAddress) context
-          .getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+      InetSocketAddress nnSocketAddress =
+          (InetSocketAddress)context.getAttribute(
+              NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
       String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
           + nnSocketAddress.getPort();
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Wed Sep 26 22:55:00 2012
@@ -32,9 +32,12 @@ import javax.servlet.http.HttpServletRes
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.MD5Hash;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 
@@ -54,6 +58,8 @@ public class TransferFsImage {
   
   public final static String CONTENT_LENGTH = "Content-Length";
   public final static String MD5_HEADER = "X-MD5-Digest";
+  @VisibleForTesting
+  static int timeout = 0;
 
   private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
   
@@ -222,6 +228,18 @@ public class TransferFsImage {
     HttpURLConnection connection = (HttpURLConnection)
       SecurityUtil.openSecureHttpConnection(url);
 
+    if (timeout <= 0) {
+      // Set the ping interval as timeout
+      Configuration conf = new HdfsConfiguration();
+      timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
+    }
+
+    if (timeout > 0) {
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+    }
+
     if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
       throw new HttpGetFailedException(
           "Image transfer servlet at " + url +

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Wed Sep 26 22:55:00 2012
@@ -21,9 +21,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.EnumSet;
 
 import javax.servlet.ServletContext;
@@ -92,6 +94,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
@@ -116,9 +119,20 @@ public class NamenodeWebHdfsMethods {
     return REMOTE_ADDRESS.get();
   }
 
-  /** Set the remote client address. */
-  static void setRemoteAddress(String remoteAddress) {
-    REMOTE_ADDRESS.set(remoteAddress);
+  public static InetAddress getRemoteIp() {
+    try {
+      return InetAddress.getByName(getRemoteAddress());
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * Returns true if a WebHdfs request is in progress.  Akin to
+   * {@link Server#isRpcInvocation()}.
+   */
+  public static boolean isWebHdfsInvocation() {
+    return getRemoteAddress() != null;
   }
 
   private @Context ServletContext context;
@@ -150,8 +164,9 @@ public class NamenodeWebHdfsMethods {
       final DatanodeDescriptor clientNode = bm.getDatanodeManager(
           ).getDatanodeByHost(getRemoteAddress());
       if (clientNode != null) {
-        final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy(
-            ).chooseTarget(path, 1, clientNode, null, blocksize);
+        final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy()
+            .chooseTarget(path, 1, clientNode,
+                new ArrayList<DatanodeDescriptor>(), false, null, blocksize);
         if (datanodes.length > 0) {
           return datanodes[0];
         }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Wed Sep 26 22:55:00 2012
@@ -420,6 +420,14 @@ public class DFSAdmin extends FsShell {
     return exitCode;
   }
 
+  public int rollEdits() throws IOException {
+    DistributedFileSystem dfs = getDFS();
+    long txid = dfs.rollEdits();
+    System.out.println("Successfully rolled edit logs.");
+    System.out.println("New segment starts at txid " + txid);
+    return 0;
+  }
+  
   /**
    * Command to enable/disable/check restoring of failed storage replicas in the namenode.
    * Usage: java DFSAdmin -restoreFailedStorage true|false|check
@@ -516,6 +524,7 @@ public class DFSAdmin extends FsShell {
       "The full syntax is: \n\n" +
       "hadoop dfsadmin [-report] [-safemode <enter | leave | get | wait>]\n" +
       "\t[-saveNamespace]\n" +
+      "\t[-rollEdits]\n" +
       "\t[-restoreFailedStorage true|false|check]\n" +
       "\t[-refreshNodes]\n" +
       "\t[" + SetQuotaCommand.USAGE + "]\n" +
@@ -548,6 +557,10 @@ public class DFSAdmin extends FsShell {
     "Save current namespace into storage directories and reset edits log.\n" +
     "\t\tRequires superuser permissions and safe mode.\n";
 
+    String rollEdits = "-rollEdits:\t" +
+    "Rolls the edit log.\n" +
+    "\t\tRequires superuser permissions.\n";
+    
     String restoreFailedStorage = "-restoreFailedStorage:\t" +
     "Set/Unset/Check flag to attempt restore of failed storage replicas if they become available.\n" +
     "\t\tRequires superuser permissions.\n";
@@ -625,6 +638,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(safemode);
     } else if ("saveNamespace".equals(cmd)) {
       System.out.println(saveNamespace);
+    } else if ("rollEdits".equals(cmd)) {
+      System.out.println(rollEdits);
     } else if ("restoreFailedStorage".equals(cmd)) {
       System.out.println(restoreFailedStorage);
     } else if ("refreshNodes".equals(cmd)) {
@@ -664,6 +679,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(report);
       System.out.println(safemode);
       System.out.println(saveNamespace);
+      System.out.println(rollEdits);
       System.out.println(restoreFailedStorage);
       System.out.println(refreshNodes);
       System.out.println(finalizeUpgrade);
@@ -859,6 +875,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-saveNamespace".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-saveNamespace]");
+    } else if ("-rollEdits".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-rollEdits]");
     } else if ("-restoreFailedStorage".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
           + " [-restoreFailedStorage true|false|check ]");
@@ -913,6 +932,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-report]");
       System.err.println("           [-safemode enter | leave | get | wait]");
       System.err.println("           [-saveNamespace]");
+      System.err.println("           [-rollEdits]");
       System.err.println("           [-restoreFailedStorage true|false|check]");
       System.err.println("           [-refreshNodes]");
       System.err.println("           [-finalizeUpgrade]");
@@ -970,6 +990,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-rollEdits".equals(cmd)) {
+      if (argv.length != 1) {
+        printUsage(cmd);
+        return exitCode;
+      }      
     } else if ("-restoreFailedStorage".equals(cmd)) {
       if (argv.length != 2) {
         printUsage(cmd);
@@ -1048,6 +1073,8 @@ public class DFSAdmin extends FsShell {
         setSafeMode(argv, i);
       } else if ("-saveNamespace".equals(cmd)) {
         exitCode = saveNamespace();
+      } else if ("-rollEdits".equals(cmd)) {
+        exitCode = rollEdits();
       } else if ("-restoreFailedStorage".equals(cmd)) {
         exitCode = restoreFaileStorage(argv[i]);
       } else if ("-refreshNodes".equals(cmd)) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Sep 26 22:55:00 2012
@@ -54,7 +54,6 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -89,6 +88,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -181,7 +181,14 @@ public class WebHdfsFileSystem extends F
       throw new IllegalArgumentException(e);
     }
     this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
-    this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
+    this.retryPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class);
     this.workingDir = getHomeDirectory();
 
     if (UserGroupInformation.isSecurityEnabled()) {

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1379224-1390762

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Wed Sep 26 22:55:00 2012
@@ -276,6 +276,13 @@ message SaveNamespaceRequestProto { // n
 message SaveNamespaceResponseProto { // void response
 }
 
+message RollEditsRequestProto { // no parameters
+}
+
+message RollEditsResponseProto { // response
+  required uint64 newSegmentTxId = 1;
+}
+
 message RestoreFailedStorageRequestProto {
   required string arg = 1;
 }
@@ -472,6 +479,8 @@ service ClientNamenodeProtocol {
       returns(SetSafeModeResponseProto);
   rpc saveNamespace(SaveNamespaceRequestProto)
       returns(SaveNamespaceResponseProto);
+  rpc rollEdits(RollEditsRequestProto)
+      returns(RollEditsResponseProto);
   rpc restoreFailedStorage(RestoreFailedStorageRequestProto)
       returns(RestoreFailedStorageResponseProto);
   rpc refreshNodes(RefreshNodesRequestProto) returns(RefreshNodesResponseProto);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Sep 26 22:55:00 2012
@@ -961,6 +961,30 @@
 </property>
 
 <property>
+  <name>dfs.namenode.check.stale.datanode</name>
+  <value>false</value>
+  <description>
+  	Indicate whether or not to check "stale" datanodes whose 
+  	heartbeat messages have not been received by the namenode 
+  	for more than a specified time interval. If this configuration 
+  	parameter is set as true, the stale datanodes will be moved to 
+  	the end of the target node list for reading. The writing will 
+  	also try to avoid stale nodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.stale.datanode.interval</name>
+  <value>30000</value>
+  <description>
+  	Default time interval for marking a datanode as "stale", i.e., if 
+  	the namenode has not received heartbeat msg from a datanode for 
+  	more than this time interval, the datanode will be marked and treated 
+  	as "stale" by default.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.invalidate.work.pct.per.iteration</name>
   <value>0.32f</value>
   <description>

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1379224-1390762

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1379224-1390762

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp Wed Sep 26 22:55:00 2012
@@ -34,8 +34,7 @@
   HAServiceState nnHAState = nn.getServiceState();
   boolean isActive = (nnHAState == HAServiceState.ACTIVE);
   String namenodeRole = nn.getRole().toString();
-  String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":"
-      + nn.getNameNodeAddress().getPort();
+  String namenodeLabel = nn.getNameNodeAddressHostPortString();
   Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = 
 	fsn.listCorruptFileBlocks("/", null);
   int corruptFileCount = corruptFileBlocks.size();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Wed Sep 26 22:55:00 2012
@@ -34,7 +34,7 @@
   boolean isActive = (nnHAState == HAServiceState.ACTIVE);
   String namenodeRole = nn.getRole().toString();
   String namenodeState = nnHAState.toString();
-  String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":" + nn.getNameNodeAddress().getPort();
+  String namenodeLabel = nn.getNameNodeAddressHostPortString();
 %>
 
 <!DOCTYPE html>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp Wed Sep 26 22:55:00 2012
@@ -33,7 +33,7 @@ String namenodeRole = nn.getRole().toStr
 FSNamesystem fsn = nn.getNamesystem();
 HAServiceState nnHAState = nn.getServiceState();
 boolean isActive = (nnHAState == HAServiceState.ACTIVE);
-String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":" + nn.getNameNodeAddress().getPort();
+String namenodeLabel = nn.getNameNodeAddressHostPortString();
 %>
 
 <!DOCTYPE html>

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1379224-1390762

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1379224-1390762

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Sep 26 22:55:00 2012
@@ -273,7 +273,7 @@ public class DFSTestUtil {
    * specified target.
    */
   public void waitReplication(FileSystem fs, String topdir, short value) 
-                                              throws IOException {
+      throws IOException, InterruptedException, TimeoutException {
     Path root = new Path(topdir);
 
     /** wait for the replication factor to settle down */
@@ -498,36 +498,44 @@ public class DFSTestUtil {
       return fileNames;
     }
   }
-  
-  /** wait for the file's replication to be done */
-  public static void waitReplication(FileSystem fs, Path fileName, 
-      short replFactor)  throws IOException {
-    boolean good;
+
+  /**
+   * Wait for the given file to reach the given replication factor.
+   * @throws TimeoutException if we fail to sufficiently replicate the file
+   */
+  public static void waitReplication(FileSystem fs, Path fileName, short replFactor)
+      throws IOException, InterruptedException, TimeoutException {
+    boolean correctReplFactor;
+    final int ATTEMPTS = 40;
+    int count = 0;
+
     do {
-      good = true;
+      correctReplFactor = true;
       BlockLocation locs[] = fs.getFileBlockLocations(
         fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
+      count++;
       for (int j = 0; j < locs.length; j++) {
         String[] hostnames = locs[j].getNames();
         if (hostnames.length != replFactor) {
-          String hostNameList = "";
-          for (String h : hostnames) hostNameList += h + " ";
-          System.out.println("Block " + j + " of file " + fileName 
-              + " has replication factor " + hostnames.length + "; locations "
-              + hostNameList);
-          good = false;
-          try {
-            System.out.println("Waiting for replication factor to drain");
-            Thread.sleep(100);
-          } catch (InterruptedException e) {} 
+          correctReplFactor = false;
+          System.out.println("Block " + j + " of file " + fileName
+              + " has replication factor " + hostnames.length
+              + " (desired " + replFactor + "); locations "
+              + Joiner.on(' ').join(hostnames));
+          Thread.sleep(1000);
           break;
         }
       }
-      if (good) {
+      if (correctReplFactor) {
         System.out.println("All blocks of file " + fileName
             + " verified to have replication factor " + replFactor);
       }
-    } while(!good);
+    } while (!correctReplFactor && count < ATTEMPTS);
+
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for " + fileName +
+          " to reach " + replFactor + " replicas");
+    }
   }
   
   /** delete directory and everything underneath it.*/

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Sep 26 22:55:00 2012
@@ -857,8 +857,8 @@ public class MiniDFSCluster {
     // After the NN has started, set back the bound ports into
     // the conf
     conf.set(DFSUtil.addKeySuffixes(
-        DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, nnId), NetUtils
-        .getHostPortString(nn.getNameNodeAddress()));
+        DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, nnId),
+        nn.getNameNodeAddressHostPortString());
     conf.set(DFSUtil.addKeySuffixes(
         DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId, nnId), NetUtils
         .getHostPortString(nn.getHttpAddress()));
@@ -880,8 +880,8 @@ public class MiniDFSCluster {
    * @return URI of the given namenode in MiniDFSCluster
    */
   public URI getURI(int nnIndex) {
-    InetSocketAddress addr = nameNodes[nnIndex].nameNode.getNameNodeAddress();
-    String hostPort = NetUtils.getHostPortString(addr);
+    String hostPort =
+        nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString();
     URI uri = null;
     try {
       uri = new URI("hdfs://" + hostPort);
@@ -918,7 +918,8 @@ public class MiniDFSCluster {
   /**
    * wait for the cluster to get out of safemode.
    */
-  public void waitClusterUp() {
+  public void waitClusterUp() throws IOException {
+    int i = 0;
     if (numDataNodes > 0) {
       while (!isClusterUp()) {
         try {
@@ -926,6 +927,9 @@ public class MiniDFSCluster {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
         }
+        if (++i > 10) {
+          throw new IOException("Timed out waiting for Mini HDFS Cluster to start");
+        }
       }
     }
   }
@@ -1354,6 +1358,7 @@ public class MiniDFSCluster {
       if (ExitUtil.terminateCalled()) {
         LOG.fatal("Test resulted in an unexpected exit",
             ExitUtil.getFirstExitException());
+        ExitUtil.resetFirstExitException();
         throw new AssertionError("Test resulted in an unexpected exit");
       }
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Wed Sep 26 22:55:00 2012
@@ -61,7 +61,7 @@ public class TestBlockReaderLocal {
    * of this class might immediately issue a retry on failure, so it's polite.
    */
   @Test
-  public void testStablePositionAfterCorruptRead() throws IOException {
+  public void testStablePositionAfterCorruptRead() throws Exception {
     final short REPL_FACTOR = 1;
     final long FILE_LENGTH = 512L;
     cluster.waitActive();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientReportBadBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientReportBadBlock.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientReportBadBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientReportBadBlock.java Wed Sep 26 22:55:00 2012
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -199,11 +200,11 @@ public class TestClientReportBadBlock {
   }
 
   /**
-   * create a file with one block and corrupt some/all of the block replicas.
+   * Create a file with one block and corrupt some/all of the block replicas.
    */
   private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
       int corruptBlockCount) throws IOException, AccessControlException,
-      FileNotFoundException, UnresolvedLinkException {
+      FileNotFoundException, UnresolvedLinkException, InterruptedException, TimeoutException {
     DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0);
     DFSTestUtil.waitReplication(dfs, filePath, repl);
     // Locate the file blocks by asking name node

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Sep 26 22:55:00 2012
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,10 +55,12 @@ public class TestConnCache {
 
   static final int BLOCK_SIZE = 4096;
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
-
+  final static int CACHE_SIZE = 4;
+  final static long CACHE_EXPIRY_MS = 200;
   static Configuration conf = null;
   static MiniDFSCluster cluster = null;
   static FileSystem fs = null;
+  static SocketCache cache;
 
   static final Path testFile = new Path("/testConnCache.dat");
   static byte authenticData[] = null;
@@ -93,6 +96,9 @@ public class TestConnCache {
   public static void setupCluster() throws Exception {
     final int REPLICATION_FACTOR = 1;
 
+    /* create a socket cache. There is only one socket cache per jvm */
+    cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
     util = new BlockReaderTestUtil(REPLICATION_FACTOR);
     cluster = util.getCluster();
     conf = util.getConf();
@@ -142,10 +148,7 @@ public class TestConnCache {
    * Test the SocketCache itself.
    */
   @Test
-  public void testSocketCache() throws IOException {
-    final int CACHE_SIZE = 4;
-    SocketCache cache = new SocketCache(CACHE_SIZE);
-
+  public void testSocketCache() throws Exception {
     // Make a client
     InetSocketAddress nnAddr =
         new InetSocketAddress("localhost", cluster.getNameNodePort());
@@ -159,6 +162,7 @@ public class TestConnCache {
     DataNode dn = util.getDataNode(block);
     InetSocketAddress dnAddr = dn.getXferAddress();
 
+
     // Make some sockets to the DN
     Socket[] dnSockets = new Socket[CACHE_SIZE];
     for (int i = 0; i < dnSockets.length; ++i) {
@@ -166,6 +170,7 @@ public class TestConnCache {
           dnAddr.getAddress(), dnAddr.getPort());
     }
 
+
     // Insert a socket to the NN
     Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
     cache.put(nnSock, null);
@@ -179,7 +184,7 @@ public class TestConnCache {
 
     assertEquals("NN socket evicted", null, cache.get(nnAddr));
     assertTrue("Evicted socket closed", nnSock.isClosed());
-
+ 
     // Lookup the DN socks
     for (Socket dnSock : dnSockets) {
       assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
@@ -189,6 +194,51 @@ public class TestConnCache {
     assertEquals("Cache is empty", 0, cache.size());
   }
 
+
+  /**
+   * Test the SocketCache expiry.
+   * Verify that socket cache entries expire after the set
+   * expiry time.
+   */
+  @Test
+  public void testSocketCacheExpiry() throws Exception {
+    // Make a client
+    InetSocketAddress nnAddr =
+        new InetSocketAddress("localhost", cluster.getNameNodePort());
+    DFSClient client = new DFSClient(nnAddr, conf);
+
+    // Find out the DN addr
+    LocatedBlock block =
+        client.getNamenode().getBlockLocations(
+            testFile.toString(), 0, FILE_SIZE)
+        .getLocatedBlocks().get(0);
+    DataNode dn = util.getDataNode(block);
+    InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+    // Make some sockets to the DN and put in cache
+    Socket[] dnSockets = new Socket[CACHE_SIZE];
+    for (int i = 0; i < dnSockets.length; ++i) {
+      dnSockets[i] = client.socketFactory.createSocket(
+          dnAddr.getAddress(), dnAddr.getPort());
+      cache.put(dnSockets[i], null);
+    }
+
+    // Client side still has the sockets cached
+    assertEquals(CACHE_SIZE, client.socketCache.size());
+
+    //sleep for a second and see if it expired
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+    
+    // Client side has no sockets cached
+    assertEquals(0, client.socketCache.size());
+
+    //sleep for another second and see if 
+    //the daemon thread runs fine on empty cache
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+  }
+
+
   /**
    * Read a file served entirely from one DN. Seek around and read from
    * different offsets. And verify that they all use the same socket.
@@ -229,33 +279,6 @@ public class TestConnCache {
 
     in.close();
   }
-  
-  /**
-   * Test that the socket cache can be disabled by setting the capacity to
-   * 0. Regression test for HDFS-3365.
-   */
-  @Test
-  public void testDisableCache() throws IOException {
-    LOG.info("Starting testDisableCache()");
-
-    // Reading with the normally configured filesystem should
-    // cache a socket.
-    DFSTestUtil.readFile(fs, testFile);
-    assertEquals(1, ((DistributedFileSystem)fs).dfs.socketCache.size());
-    
-    // Configure a new instance with no caching, ensure that it doesn't
-    // cache anything
-    Configuration confWithoutCache = new Configuration(fs.getConf());
-    confWithoutCache.setInt(
-        DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
-    FileSystem fsWithoutCache = FileSystem.newInstance(confWithoutCache);
-    try {
-      DFSTestUtil.readFile(fsWithoutCache, testFile);
-      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
-    } finally {
-      fsWithoutCache.close();
-    }
-  }
 
   @AfterClass
   public static void teardownCluster() throws Exception {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java Wed Sep 26 22:55:00 2012
@@ -789,8 +789,7 @@ public class TestDFSClientRetries {
    * way. See HDFS-3067.
    */
   @Test
-  public void testRetryOnChecksumFailure()
-      throws UnresolvedLinkException, IOException {
+  public void testRetryOnChecksumFailure() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster =
       new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
@@ -831,7 +830,7 @@ public class TestDFSClientRetries {
   }
 
   /** Test client retry with namenode restarting. */
-  @Test
+  @Test(timeout=300000)
   public void testNamenodeRestart() throws Exception {
     namenodeRestartTest(new Configuration(), false);
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java Wed Sep 26 22:55:00 2012
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.Scanner;
+import java.util.zip.DeflaterOutputStream;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.commons.logging.Log;
@@ -52,11 +53,16 @@ import org.apache.hadoop.hdfs.tools.DFSA
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
 /**
  * This class tests commands from DFSShell.
  */
@@ -575,6 +581,8 @@ public class TestDFSShell {
     try {
       final FileSystem fs = root.getFileSystem(conf);
       fs.mkdirs(root);
+
+      // Test the gzip type of files. Magic detection.
       OutputStream zout = new GZIPOutputStream(
           fs.create(new Path(root, "file.gz")));
       Random r = new Random();
@@ -599,7 +607,7 @@ public class TestDFSShell {
           Arrays.equals(file.toByteArray(), out.toByteArray()));
 
       // Create a sequence file with a gz extension, to test proper
-      // container detection
+      // container detection. Magic detection.
       SequenceFile.Writer writer = SequenceFile.createWriter(
           conf,
           SequenceFile.Writer.file(new Path(root, "file.gz")),
@@ -617,6 +625,45 @@ public class TestDFSShell {
       assertTrue("Output doesn't match input",
           Arrays.equals("Foo\tBar\n".getBytes(), out.toByteArray()));
       out.reset();
+
+      // Test deflate. Extension-based detection.
+      OutputStream dout = new DeflaterOutputStream(
+          fs.create(new Path(root, "file.deflate")));
+      byte[] outbytes = "foo".getBytes();
+      dout.write(outbytes);
+      dout.close();
+      out = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(out));
+      argv = new String[2];
+      argv[0] = "-text";
+      argv[1] = new Path(root, "file.deflate").toString();
+      ret = ToolRunner.run(new FsShell(conf), argv);
+      assertEquals("'-text " + argv[1] + " returned " + ret, 0, ret);
+      assertTrue("Output doesn't match input",
+          Arrays.equals(outbytes, out.toByteArray()));
+      out.reset();
+
+      // Test a simple codec. Extension based detection. We use
+      // Bzip2 cause its non-native.
+      CompressionCodec codec = (CompressionCodec)
+          ReflectionUtils.newInstance(BZip2Codec.class, conf);
+      String extension = codec.getDefaultExtension();
+      Path p = new Path(root, "file." + extension);
+      OutputStream fout = new DataOutputStream(codec.createOutputStream(
+          fs.create(p, true)));
+      byte[] writebytes = "foo".getBytes();
+      fout.write(writebytes);
+      fout.close();
+      out = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(out));
+      argv = new String[2];
+      argv[0] = "-text";
+      argv[1] = new Path(root, p).toString();
+      ret = ToolRunner.run(new FsShell(conf), argv);
+      assertEquals("'-text " + argv[1] + " returned " + ret, 0, ret);
+      assertTrue("Output doesn't match input",
+          Arrays.equals(writebytes, out.toByteArray()));
+      out.reset();
     } finally {
       if (null != bak) {
         System.setOut(bak);
@@ -1284,6 +1331,11 @@ public class TestDFSShell {
   public void testGet() throws IOException {
     DFSTestUtil.setLogLevel2All(FSInputChecker.LOG);
     final Configuration conf = new HdfsConfiguration();
+    // Race can happen here: block scanner is reading the file when test tries
+    // to corrupt the test file, which will fail the test on Windows platform.
+    // Disable block scanner to avoid this race.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
 
@@ -1475,4 +1527,95 @@ public class TestDFSShell {
 
   }
 
+  /**
+   * Delete a file optionally configuring trash on the server and client.
+   */
+  private void deleteFileUsingTrash(
+      boolean serverTrash, boolean clientTrash) throws Exception {
+    // Run a cluster, optionally with trash enabled on the server
+    Configuration serverConf = new HdfsConfiguration();
+    if (serverTrash) {
+      serverConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
+    }
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf)
+      .numDataNodes(1).format(true).build();
+    Configuration clientConf = new Configuration(serverConf);
+
+    // Create a client, optionally with trash enabled
+    if (clientTrash) {
+      clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
+    } else {
+      clientConf.setLong(FS_TRASH_INTERVAL_KEY, 0);
+    }
+
+    FsShell shell = new FsShell(clientConf);
+    FileSystem fs = null;
+
+    try {
+      // Create and delete a file
+      fs = cluster.getFileSystem();
+      writeFile(fs, new Path(TEST_ROOT_DIR, "foo"));
+      final String testFile = TEST_ROOT_DIR + "/foo";
+      final String trashFile = shell.getCurrentTrashDir() + "/" + testFile;
+      String[] argv = new String[] { "-rm", testFile };
+      int res = ToolRunner.run(shell, argv);
+      assertEquals("rm failed", 0, res);
+
+      if (serverTrash) {
+        // If the server config was set we should use it unconditionally
+        assertTrue("File not in trash", fs.exists(new Path(trashFile)));
+      } else if (clientTrash) {
+        // If the server config was not set but the client config was
+        // set then we should use it
+        assertTrue("File not in trashed", fs.exists(new Path(trashFile)));
+      } else {
+        // If neither was set then we should not have trashed the file
+        assertFalse("File was not removed", fs.exists(new Path(testFile)));
+        assertFalse("File was trashed", fs.exists(new Path(trashFile)));
+      }
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test that the server trash configuration is respected when
+   * the client configuration is not set.
+   */
+  @Test
+  public void testServerConfigRespected() throws Exception {
+    deleteFileUsingTrash(true, false);
+  }
+
+  /**
+   * Test that server trash configuration is respected even when the
+   * client configuration is set.
+   */
+  @Test
+  public void testServerConfigRespectedWithClient() throws Exception {
+    deleteFileUsingTrash(true, true);
+  }
+
+  /**
+   * Test that the client trash configuration is respected when
+   * the server configuration is not set.
+   */
+  @Test
+  public void testClientConfigRespected() throws Exception {
+    deleteFileUsingTrash(false, true);
+  }
+
+  /**
+   * Test that trash is disabled by default.
+   */
+  @Test
+  public void testNoTrashConfig() throws Exception {
+    deleteFileUsingTrash(false, false);
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Wed Sep 26 22:55:00 2012
@@ -34,14 +34,19 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
 import org.junit.Test;
 
 /**
@@ -59,6 +64,10 @@ public class TestDatanodeBlockScanner {
   
   private static Pattern pattern_blockVerify = 
              Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
+  
+  static {
+    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
+  }
   /**
    * This connects to datanode and fetches block verification data.
    * It repeats this until the given block has a verification time > newTime.
@@ -173,7 +182,7 @@ public class TestDatanodeBlockScanner {
   }
 
   @Test
-  public void testBlockCorruptionPolicy() throws IOException {
+  public void testBlockCorruptionPolicy() throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
     Random random = new Random();
@@ -206,12 +215,12 @@ public class TestDatanodeBlockScanner {
     assertTrue(MiniDFSCluster.corruptReplica(1, block));
     assertTrue(MiniDFSCluster.corruptReplica(2, block));
 
-    // Read the file to trigger reportBadBlocks by client
-    try {
-      IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
-                        conf, true);
-    } catch (IOException e) {
-      // Ignore exception
+    // Trigger each of the DNs to scan this block immediately.
+    // The block pool scanner doesn't run frequently enough on its own
+    // to notice these, and due to HDFS-1371, the client won't report
+    // bad blocks to the NN when all replicas are bad.
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.runBlockScannerForBlock(dn, block);
     }
 
     // We now have the blocks to be marked as corrupt and we get back all
@@ -260,6 +269,7 @@ public class TestDatanodeBlockScanner {
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L);
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
     cluster.waitActive();
@@ -267,35 +277,47 @@ public class TestDatanodeBlockScanner {
     Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
     DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
     ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
+    final int ITERATIONS = 10;
 
     // Wait until block is replicated to numReplicas
     DFSTestUtil.waitReplication(fs, file1, numReplicas);
 
-    // Corrupt numCorruptReplicas replicas of block 
-    int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
-    for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
-      if (corruptReplica(block, i)) {
-        corruptReplicasDNIDs[j++] = i;
-        LOG.info("successfully corrupted block " + block + " on node " 
-                 + i + " " + cluster.getDataNodes().get(i).getDisplayName());
+    for (int k = 0; ; k++) {
+      // Corrupt numCorruptReplicas replicas of block 
+      int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
+      for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
+        if (corruptReplica(block, i)) {
+          corruptReplicasDNIDs[j++] = i;
+          LOG.info("successfully corrupted block " + block + " on node " 
+                   + i + " " + cluster.getDataNodes().get(i).getDisplayName());
+        }
+      }
+      
+      // Restart the datanodes containing corrupt replicas 
+      // so they would be reported to namenode and re-replicated
+      // They MUST be restarted in reverse order from highest to lowest index,
+      // because the act of restarting them removes them from the ArrayList
+      // and causes the indexes of all nodes above them in the list to change.
+      for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
+        LOG.info("restarting node with corrupt replica: position " 
+            + i + " node " + corruptReplicasDNIDs[i] + " " 
+            + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
+        cluster.restartDataNode(corruptReplicasDNIDs[i]);
       }
-    }
-    
-    // Restart the datanodes containing corrupt replicas 
-    // so they would be reported to namenode and re-replicated
-    // They MUST be restarted in reverse order from highest to lowest index,
-    // because the act of restarting them removes them from the ArrayList
-    // and causes the indexes of all nodes above them in the list to change.
-    for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
-      LOG.info("restarting node with corrupt replica: position " 
-          + i + " node " + corruptReplicasDNIDs[i] + " " 
-          + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
-      cluster.restartDataNode(corruptReplicasDNIDs[i]);
-    }
 
-    // Loop until all corrupt replicas are reported
-    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-        block, numCorruptReplicas);
+      // Loop until all corrupt replicas are reported
+      try {
+        DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
+                                        block, numCorruptReplicas);
+      } catch(TimeoutException e) {
+        if (k > ITERATIONS) {
+          throw e;
+        }
+        LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k);
+        continue;
+      }
+      break;
+    }
     
     // Loop until the block recovers after replication
     DFSTestUtil.waitReplication(fs, file1, numReplicas);