You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/11/15 00:57:00 UTC

svn commit: r1542125 [2/2] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/libexec/ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/java/org/apache/h...

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java Thu Nov 14 23:56:56 2013
@@ -31,7 +31,6 @@ import java.security.PrivilegedException
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.TimeZone;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -50,20 +49,17 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
-import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ServletUtil;
 import org.xml.sax.Attributes;
@@ -83,7 +79,9 @@ import org.xml.sax.helpers.XMLReaderFact
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class HftpFileSystem extends FileSystem
-    implements DelegationTokenRenewer.Renewable {
+    implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
+  public static final String SCHEME = "hftp";
+
   static {
     HttpURLConnection.setFollowRedirects(true);
   }
@@ -100,19 +98,13 @@ public class HftpFileSystem extends File
   public static final String HFTP_TIMEZONE = "UTC";
   public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
 
+  private TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
   private Token<?> delegationToken;
   private Token<?> renewToken;
-  private static final HftpDelegationTokenSelector hftpTokenSelector =
-      new HftpDelegationTokenSelector();
-
-  private DelegationTokenRenewer dtRenewer = null;
 
-  private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
-    if (dtRenewer == null) {
-      dtRenewer = DelegationTokenRenewer.getInstance();
-    }
-
-    dtRenewer.addRenewAction(hftpFs);
+  @Override
+  public URI getCanonicalUri() {
+    return super.getCanonicalUri();
   }
 
   public static final SimpleDateFormat getDateFormat() {
@@ -177,7 +169,7 @@ public class HftpFileSystem extends File
    */
   @Override
   public String getScheme() {
-    return "hftp";
+    return SCHEME;
   }
 
   @Override
@@ -195,39 +187,10 @@ public class HftpFileSystem extends File
     }
 
     if (UserGroupInformation.isSecurityEnabled()) {
-      initDelegationToken();
-    }
-  }
-
-  protected void initDelegationToken() throws IOException {
-    // look for hftp token, then try hdfs
-    Token<?> token = selectDelegationToken(ugi);
-
-    // if we don't already have a token, go get one over https
-    boolean createdToken = false;
-    if (token == null) {
-      token = getDelegationToken(null);
-      createdToken = (token != null);
+      tokenAspect.initDelegationToken(ugi);
     }
-
-    // we already had a token or getDelegationToken() didn't fail.
-    if (token != null) {
-      setDelegationToken(token);
-      if (createdToken) {
-        addRenewAction(this);
-        LOG.debug("Created new DT for " + token.getService());
-      } else {
-        LOG.debug("Found existing DT for " + token.getService());
-      }
-    }
-  }
-
-  protected Token<DelegationTokenIdentifier> selectDelegationToken(
-      UserGroupInformation ugi) {
-    return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
   }
 
-
   @Override
   public Token<?> getRenewToken() {
     return renewToken;
@@ -242,16 +205,19 @@ public class HftpFileSystem extends File
 
   @Override
   public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+    /**
+     * XXX The kind of the token has been changed by DelegationTokenFetcher. We
+     * use the token for renewal, since the reflection utilities needs the value
+     * of the kind field to correctly renew the token.
+     *
+     * For other operations, however, the client has to send a
+     * HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop
+     * 0.20.3 clusters. Later releases fix this problem. See HDFS-5440 for more
+     * details.
+     */
     renewToken = token;
-    // emulate the 203 usage of the tokens
-    // by setting the kind and service as if they were hdfs tokens
     delegationToken = new Token<T>(token);
-    // NOTE: the remote nn must be configured to use hdfs
     delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
-    // no need to change service because we aren't exactly sure what it
-    // should be.  we can guess, but it might be wrong if the local conf
-    // value is incorrect.  the service is a client side field, so the remote
-    // end does not care about the value
   }
 
   @Override
@@ -266,7 +232,7 @@ public class HftpFileSystem extends File
           final String nnHttpUrl = nnUri.toString();
           Credentials c;
           try {
-            c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
+            c = DelegationTokenFetcher.getDTfromRemote(connectionFactory, nnUri, renewer);
           } catch (IOException e) {
             if (e.getCause() instanceof ConnectException) {
               LOG.warn("Couldn't connect to " + nnHttpUrl +
@@ -350,6 +316,7 @@ public class HftpFileSystem extends File
     String tokenString = null;
     if (UserGroupInformation.isSecurityEnabled()) {
       synchronized (this) {
+        tokenAspect.ensureTokenInitialized();
         if (delegationToken != null) {
           tokenString = delegationToken.encodeToUrlString();
           return (query + JspHelper.getDelegationTokenUrlParam(tokenString));
@@ -419,9 +386,7 @@ public class HftpFileSystem extends File
   @Override
   public void close() throws IOException {
     super.close();
-    if (dtRenewer != null) {
-      dtRenewer.removeRenewAction(this); // blocks
-    }
+    tokenAspect.removeRenewAction();
   }
 
   /** Class to parse and store a listing reply from the server. */
@@ -696,67 +661,33 @@ public class HftpFileSystem extends File
     return cs != null? cs: super.getContentSummary(f);
   }
 
-  @InterfaceAudience.Private
-  public static class TokenManager extends TokenRenewer {
-
-    @Override
-    public boolean handleKind(Text kind) {
-      return kind.equals(TOKEN_KIND);
-    }
-
-    @Override
-    public boolean isManaged(Token<?> token) throws IOException {
-      return true;
-    }
-
-    protected String getUnderlyingProtocol() {
-      return "http";
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public long renew(Token<?> token,
-                      Configuration conf) throws IOException {
-      // update the kerberos credentials, if they are coming from a keytab
-      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-      InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
-      return
-        DelegationTokenFetcher.renewDelegationToken
-        (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
-         (Token<DelegationTokenIdentifier>) token);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void cancel(Token<?> token,
-                       Configuration conf) throws IOException {
-      // update the kerberos credentials, if they are coming from a keytab
-      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-      InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
-      DelegationTokenFetcher.cancelDelegationToken
-        (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
-         (Token<DelegationTokenIdentifier>) token);
+  @SuppressWarnings("unchecked")
+  @Override
+  public long renewDelegationToken(Token<?> token) throws IOException {
+    // update the kerberos credentials, if they are coming from a keytab
+    UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+    InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+    try {
+      return DelegationTokenFetcher.renewDelegationToken(connectionFactory,
+          DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr),
+          (Token<DelegationTokenIdentifier>) token);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
     }
   }
 
-  private static class HftpDelegationTokenSelector
-  extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
-    private static final DelegationTokenSelector hdfsTokenSelector =
-        new DelegationTokenSelector();
-
-    public HftpDelegationTokenSelector() {
-      super(TOKEN_KIND);
-    }
-
-    Token<DelegationTokenIdentifier> selectToken(URI nnUri,
-        Collection<Token<?>> tokens, Configuration conf) {
-      Token<DelegationTokenIdentifier> token =
-          selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
-      if (token == null) {
-        // try to get a HDFS token
-        token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
-      }
-      return token;
+  @SuppressWarnings("unchecked")
+  @Override
+  public void cancelDelegationToken(Token<?> token) throws IOException {
+    // update the kerberos credentials, if they are coming from a keytab
+    UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+    InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+    try {
+      DelegationTokenFetcher.cancelDelegationToken(connectionFactory, DFSUtil
+          .createUri(getUnderlyingProtocol(), serviceAddr),
+          (Token<DelegationTokenIdentifier>) token);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
     }
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java Thu Nov 14 23:56:56 2013
@@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -77,24 +77,28 @@ public class URLConnectionFactory {
    * @throws IOException
    */
   public URLConnection openConnection(URL url) throws IOException {
-    URLConnection connection = url.openConnection();
-    if (connection instanceof HttpURLConnection) {
-      connConfigurator.configure((HttpURLConnection) connection);
+    try {
+      return openConnection(url, false);
+    } catch (AuthenticationException e) {
+      // Unreachable
+      return null;
     }
-    return connection;
   }
 
   /**
    * Opens a url with read and connect timeouts
    *
-   * @param url URL to open
+   * @param url
+   *          URL to open
+   * @param isSpnego
+   *          whether the url should be authenticated via SPNEGO
    * @return URLConnection
    * @throws IOException
    * @throws AuthenticationException
    */
-  public URLConnection openConnection(HttpOpParam.Op op, URL url)
+  public URLConnection openConnection(URL url, boolean isSpnego)
       throws IOException, AuthenticationException {
-    if (op.getRequireAuth()) {
+    if (isSpnego) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("open AuthenticatedURL connection" + url);
       }
@@ -106,7 +110,11 @@ public class URLConnectionFactory {
       if (LOG.isDebugEnabled()) {
         LOG.debug("open URL connection");
       }
-      return openConnection(url);
+      URLConnection connection = url.openConnection();
+      if (connection instanceof HttpURLConnection) {
+        connConfigurator.configure((HttpURLConnection) connection);
+      }
+      return connection;
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Thu Nov 14 23:56:56 2013
@@ -30,7 +30,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
@@ -56,8 +55,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.web.TokenAspect.DTSelecorByKind;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
@@ -96,8 +95,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.util.Progressable;
 import org.mortbay.util.ajax.JSON;
 
@@ -107,7 +104,7 @@ import com.google.common.collect.Lists;
 
 /** A FileSystem for HDFS over the web. */
 public class WebHdfsFileSystem extends FileSystem
-    implements DelegationTokenRenewer.Renewable {
+    implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
   public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
   /** File System URI: {SCHEME}://namenode:port/path/to/file */
   public static final String SCHEME = "webhdfs";
@@ -122,13 +119,18 @@ public class WebHdfsFileSystem extends F
   /** Delegation token kind */
   public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
   /** Token selector */
-  public static final WebHdfsDelegationTokenSelector DT_SELECTOR
-      = new WebHdfsDelegationTokenSelector();
+  public static final DTSelecorByKind DT_SELECTOR
+      = new DTSelecorByKind(TOKEN_KIND);
 
   private DelegationTokenRenewer dtRenewer = null;
   @VisibleForTesting
   DelegationTokenRenewer.RenewAction<?> action;
 
+  @Override
+  public URI getCanonicalUri() {
+    return super.getCanonicalUri();
+  }
+
   @VisibleForTesting
   protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
     if (dtRenewer == null) {
@@ -142,7 +144,6 @@ public class WebHdfsFileSystem extends F
   public static boolean isEnabled(final Configuration conf, final Log log) {
     final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
         DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
-    log.info(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY + " = " + b);
     return b;
   }
 
@@ -490,7 +491,8 @@ public class WebHdfsFileSystem extends F
         throws IOException {
       final HttpURLConnection conn;
       try {
-        conn = (HttpURLConnection) connectionFactory.openConnection(op, url);
+        conn = (HttpURLConnection) connectionFactory.openConnection(url,
+            op.getRequireAuth());
       } catch (AuthenticationException e) {
         throw new IOException(e);
       }
@@ -986,7 +988,8 @@ public class WebHdfsFileSystem extends F
     }
   }
 
-  private synchronized long renewDelegationToken(final Token<?> token
+  @Override
+  public synchronized long renewDelegationToken(final Token<?> token
       ) throws IOException {
     final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
     TokenArgumentParam dtargParam = new TokenArgumentParam(
@@ -995,7 +998,8 @@ public class WebHdfsFileSystem extends F
     return (Long) m.get("long");
   }
 
-  private synchronized void cancelDelegationToken(final Token<?> token
+  @Override
+  public synchronized void cancelDelegationToken(final Token<?> token
       ) throws IOException {
     final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
     TokenArgumentParam dtargParam = new TokenArgumentParam(
@@ -1041,57 +1045,4 @@ public class WebHdfsFileSystem extends F
     final Map<?, ?> m = run(op, p);
     return JsonUtil.toMD5MD5CRC32FileChecksum(m);
   }
-
-  /** Delegation token renewer. */
-  public static class DtRenewer extends TokenRenewer {
-    @Override
-    public boolean handleKind(Text kind) {
-      return kind.equals(TOKEN_KIND);
-    }
-  
-    @Override
-    public boolean isManaged(Token<?> token) throws IOException {
-      return true;
-    }
-
-    private static WebHdfsFileSystem getWebHdfs(
-        final Token<?> token, final Configuration conf) throws IOException {
-      
-      final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
-      final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
-      return (WebHdfsFileSystem)FileSystem.get(uri, conf);
-    }
-
-    @Override
-    public long renew(final Token<?> token, final Configuration conf
-        ) throws IOException, InterruptedException {
-      return getWebHdfs(token, conf).renewDelegationToken(token);
-    }
-  
-    @Override
-    public void cancel(final Token<?> token, final Configuration conf
-        ) throws IOException, InterruptedException {
-      getWebHdfs(token, conf).cancelDelegationToken(token);
-    }
-  }
-  
-  private static class WebHdfsDelegationTokenSelector
-  extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
-    private static final DelegationTokenSelector hdfsTokenSelector =
-        new DelegationTokenSelector();
-    
-    public WebHdfsDelegationTokenSelector() {
-      super(TOKEN_KIND);
-    }
-    
-    Token<DelegationTokenIdentifier> selectToken(URI nnUri,
-        Collection<Token<?>> tokens, Configuration conf) {
-      Token<DelegationTokenIdentifier> token =
-          selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
-      if (token == null) {
-        token = hdfsTokenSelector.selectToken(nnUri, tokens, conf); 
-      }
-      return token;
-    }
-  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Thu Nov 14 23:56:56 2013
@@ -13,5 +13,4 @@
 #
 org.apache.hadoop.hdfs.DFSClient$Renewer
 org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
-org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
-org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer
+org.apache.hadoop.hdfs.web.TokenAspect$TokenManager

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Thu Nov 14 23:56:56 2013
@@ -1517,6 +1517,18 @@
 </property>
 
 <property>
+  <name>dfs.namenode.path.based.cache.retry.interval.ms</name>
+  <value>60000</value>
+  <description>
+    When the NameNode needs to uncache something that is cached, or cache
+    something that is not cached, it must direct the DataNodes to do so by
+    sending a DNA_CACHE or DNA_UNCACHE command in response to a DataNode
+    heartbeat.  This parameter controls how frequently the NameNode will
+    resend these commands.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
   <value>4</value>
   <description>

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1535792-1536571,1536573-1542122

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html Thu Nov 14 23:56:56 2013
@@ -30,6 +30,12 @@
 </div>
 <div id="panel"></div>
 </div>
+<div class="row">
+<hr />
+<div class="col-xs-2"><p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013.</p></div>
+<div class="col-xs-1 pull-right"><a style="color: #ddd" href="dfshealth.jsp">Legacy UI</a></div>
+</div>
+
 <script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/2.0.3/jquery.min.js">
 </script><script type="text/javascript" src="//netdna.bootstrapcdn.com/bootstrap/3.0.0/js/bootstrap.min.js">
 </script><script type="text/javascript" src="/static/dust-full-2.0.0.min.js">
@@ -37,7 +43,5 @@
 </script><script type="text/javascript" src="dfs-dust.js">
 </script><script type="text/javascript" src="dfshealth.js">
 </script>
-<hr />
-<p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013.</p>
 </body>
 </html>

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Thu Nov 14 23:56:56 2013
@@ -70,6 +70,6 @@
 <hr/>
 <h3>Startup Progress</h3>
 <% healthjsp.generateStartupProgress(out, nn.getStartupProgress()); %>
-<%
-out.println(ServletUtil.htmlFooter());
-%>
+<hr/><p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013.&nbsp;<a href="dfshealth.html">New UI</a></p>
+</body>
+</html>

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html Thu Nov 14 23:56:56 2013
@@ -14,22 +14,22 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-<meta HTTP-EQUIV="REFRESH" content="0;url=dfshealth.jsp"/>
-<html>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
+    "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
 <head>
+<meta http-equiv="REFRESH" content="1;url=dfshealth.jsp" />
 <title>Hadoop Administration</title>
 </head>
-
 <body>
-
+<script type="text/javascript">
+//<![CDATA[
+window.location.href='dfshealth.html';
+//]]>
+</script>
 <h1>Hadoop Administration</h1>
-
 <ul>
-
 <li><a href="dfshealth.jsp">DFS Health/Status</a></li>
-
 </ul>
-
 </body>
-
 </html>

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Thu Nov 14 23:56:56 2013
@@ -26,10 +26,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
 /**
  * This tests pipeline recovery related client protocol works correct or not.
  */
@@ -112,4 +116,55 @@ public class TestClientProtocolForPipeli
       cluster.shutdown();
     }
   }
+
+  /** Test whether corrupt replicas are detected correctly during pipeline
+   * recoveries.
+   */
+  @Test
+  public void testPipelineRecoveryForLastBlock() throws IOException {
+    DFSClientFaultInjector faultInjector
+        = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
+    DFSClientFaultInjector.instance = faultInjector;
+    Configuration conf = new HdfsConfiguration();
+
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+    MiniDFSCluster cluster = null;
+
+    try {
+      int numDataNodes = 3;
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("dataprotocol1.dat");
+      Mockito.when(faultInjector.failPacket()).thenReturn(true);
+      try {
+        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      } catch (IOException e) {
+        // completeFile() should fail.
+        Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
+        return;
+      }
+
+      // At this point, NN let data corruption to happen. 
+      // Before failing test, try reading the file. It should fail.
+      FSDataInputStream in = fileSys.open(file);
+      try {
+        int c = in.read();
+        // Test will fail with BlockMissingException if NN does not update the
+        // replica state based on the latest report.
+      } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
+        Assert.fail("Block is missing because the file was closed with"
+            + " corrupt replicas.");
+      }
+      Assert.fail("The file was closed with corrupt replicas, but read still"
+          + " works!");
+    } finally {
+      DFSClientFaultInjector.instance = oldInjector;
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java Thu Nov 14 23:56:56 2013
@@ -86,6 +86,9 @@ public class TestQuota {
     // Space quotas
     final int DEFAULT_BLOCK_SIZE = 512;
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    // Make it relinquish locks. When run serially, the result should
+    // be identical.
+    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     final FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -350,6 +353,7 @@ public class TestQuota {
       }
       assertTrue(hasException);
 
+      assertEquals(4, cluster.getNamesystem().getFSDirectory().getYieldCount());
     } finally {
       cluster.shutdown();
     }
@@ -360,6 +364,9 @@ public class TestQuota {
   @Test
   public void testNamespaceCommands() throws Exception {
     final Configuration conf = new HdfsConfiguration();
+    // Make it relinquish locks. When run serially, the result should
+    // be identical.
+    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     final FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -515,6 +522,7 @@ public class TestQuota {
       c = dfs.getContentSummary(quotaDir1);
       assertEquals(c.getDirectoryCount(), 6);
       assertEquals(c.getQuota(), 6);
+      assertEquals(14, cluster.getNamesystem().getFSDirectory().getYieldCount());
     } finally {
       cluster.shutdown();
     }
@@ -532,6 +540,9 @@ public class TestQuota {
     // set a smaller block size so that we can test with smaller 
     // diskspace quotas
     conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
+    // Make it relinquish locks. When run serially, the result should
+    // be identical.
+    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     final FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -764,6 +775,7 @@ public class TestQuota {
       assertEquals(c.getSpaceConsumed(),
           (sizeFactorA + sizeFactorB + sizeFactorC) * fileSpace);
 
+      assertEquals(20, cluster.getNamesystem().getFSDirectory().getYieldCount());
     } finally {
       cluster.shutdown();
     }
@@ -905,6 +917,9 @@ public class TestQuota {
     final int BLOCK_SIZE = 6 * 1024;
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    // Make it relinquish locks. When run serially, the result should
+    // be identical.
+    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
     MiniDFSCluster cluster = 
       new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
@@ -971,6 +986,7 @@ public class TestQuota {
         exceededQuota = true;
       }
       assertTrue("Quota not exceeded", exceededQuota);
+      assertEquals(2, cluster.getNamesystem().getFSDirectory().getYieldCount());
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Thu Nov 14 23:56:56 2013
@@ -28,8 +28,10 @@ import static org.mockito.Mockito.doRetu
 
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +51,6 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.MetricsAsserts;
@@ -87,8 +89,7 @@ public class TestFsDatasetCache {
   private static FsDatasetSpi<?> fsd;
   private static DatanodeProtocolClientSideTranslatorPB spyNN;
   private static PageRounder rounder = new PageRounder();
-
-  private Mlocker mlocker;
+  private static CacheManipulator prevCacheManipulator;
 
   @Before
   public void setUp() throws Exception {
@@ -96,6 +97,8 @@ public class TestFsDatasetCache {
     assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
     conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
+        500);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         CACHE_CAPACITY);
@@ -113,8 +116,19 @@ public class TestFsDatasetCache {
     fsd = dn.getFSDataset();
 
     spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
-    // Save the current mlocker and replace it at the end of the test
-    mlocker = MappableBlock.mlocker;
+
+    prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
+
+    // Save the current CacheManipulator and replace it at the end of the test
+    // Stub out mlock calls to avoid failing when not enough memory is lockable
+    // by the operating system.
+    NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        LOG.info("mlocking " + identifier);
+      }
+    };
   }
 
   @After
@@ -125,8 +139,8 @@ public class TestFsDatasetCache {
     if (cluster != null) {
       cluster.shutdown();
     }
-    // Restore the original mlocker
-    MappableBlock.mlocker = mlocker;
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
   }
 
   private static void setHeartbeatResponse(DatanodeCommand[] cmds)
@@ -214,8 +228,7 @@ public class TestFsDatasetCache {
     return expected;
   }
 
-  @Test(timeout=600000)
-  public void testCacheAndUncacheBlock() throws Exception {
+  private void testCacheAndUncacheBlock() throws Exception {
     LOG.info("beginning testCacheAndUncacheBlock");
     final int NUM_BLOCKS = 5;
 
@@ -269,6 +282,42 @@ public class TestFsDatasetCache {
   }
 
   @Test(timeout=600000)
+  public void testCacheAndUncacheBlockSimple() throws Exception {
+    testCacheAndUncacheBlock();
+  }
+
+  /**
+   * Run testCacheAndUncacheBlock with some failures injected into the mlock
+   * call.  This tests the ability of the NameNode to resend commands.
+   */
+  @Test(timeout=600000)
+  public void testCacheAndUncacheBlockWithRetries() throws Exception {
+    CacheManipulator prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
+    
+    try {
+      NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
+        private final Set<String> seenIdentifiers = new HashSet<String>();
+        
+        @Override
+        public void mlock(String identifier,
+            ByteBuffer mmap, long length) throws IOException {
+          if (seenIdentifiers.contains(identifier)) {
+            // mlock succeeds the second time.
+            LOG.info("mlocking " + identifier);
+            return;
+          }
+          seenIdentifiers.add(identifier);
+          throw new IOException("injecting IOException during mlock of " +
+              identifier);
+        }
+      };
+      testCacheAndUncacheBlock();
+    } finally {
+      NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
+    }
+  }
+
+  @Test(timeout=600000)
   public void testFilesExceedMaxLockedMemory() throws Exception {
     LOG.info("beginning testFilesExceedMaxLockedMemory");
 
@@ -357,10 +406,11 @@ public class TestFsDatasetCache {
     assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
     assertEquals("Unexpected amount of cache used", current, cacheUsed);
 
-    MappableBlock.mlocker = new MappableBlock.Mlocker() {
+    NativeIO.POSIX.cacheManipulator = new NativeIO.POSIX.CacheManipulator() {
       @Override
-      public void mlock(MappedByteBuffer mmap, long length) throws IOException {
-        LOG.info("An mlock operation is starting.");
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        LOG.info("An mlock operation is starting on " + identifier);
         try {
           Thread.sleep(3000);
         } catch (InterruptedException e) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java Thu Nov 14 23:56:56 2013
@@ -93,6 +93,10 @@ public class TestCorruptFilesJsp  {
         in.close();
       }
 
+      try {
+        Thread.sleep(3000); // Wait for block reports. They shouldn't matter.
+      } catch (InterruptedException ie) {}
+
       // verify if all corrupt files were reported to NN
       badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
       assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java Thu Nov 14 23:56:56 2013
@@ -33,6 +33,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -81,15 +83,7 @@ public class TestPathBasedCacheRequests 
   static private MiniDFSCluster cluster;
   static private DistributedFileSystem dfs;
   static private NamenodeProtocols proto;
-
-  static {
-    MappableBlock.mlocker = new MappableBlock.Mlocker() {
-      @Override
-      public void mlock(MappedByteBuffer mmap, long length) throws IOException {
-        // Stubbed out for testing
-      }
-    };
-  }
+  static private CacheManipulator prevCacheManipulator;
 
   @Before
   public void setup() throws Exception {
@@ -101,6 +95,18 @@ public class TestPathBasedCacheRequests 
     cluster.waitActive();
     dfs = cluster.getFileSystem();
     proto = cluster.getNameNodeRpc();
+    prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
+
+    // Save the current CacheManipulator and replace it at the end of the test
+    // Stub out mlock calls to avoid failing when not enough memory is lockable
+    // by the operating system.
+    NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        LOG.info("mlocking " + identifier);
+      }
+    };
   }
 
   @After
@@ -108,6 +114,8 @@ public class TestPathBasedCacheRequests 
     if (cluster != null) {
       cluster.shutdown();
     }
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
   }
 
   @Test(timeout=60000)
@@ -552,8 +560,8 @@ public class TestPathBasedCacheRequests 
    * @throws Exception
    */
   private static void waitForCachedBlocks(NameNode nn,
-      final int expectedCachedBlocks, final int expectedCachedReplicas) 
-          throws Exception {
+      final int expectedCachedBlocks, final int expectedCachedReplicas,
+      final String logString) throws Exception {
     final FSNamesystem namesystem = nn.getNamesystem();
     final CacheManager cacheManager = namesystem.getCacheManager();
     LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
@@ -581,9 +589,9 @@ public class TestPathBasedCacheRequests 
             (numCachedReplicas == expectedCachedReplicas)) {
           return true;
         } else {
-          LOG.info("cached blocks: have " + numCachedBlocks +
-              " / " + expectedCachedBlocks);
-          LOG.info("cached replicas: have " + numCachedReplicas +
+          LOG.info(logString + " cached blocks: have " + numCachedBlocks +
+              " / " + expectedCachedBlocks + ".  " +
+              "cached replicas: have " + numCachedReplicas +
               " / " + expectedCachedReplicas);
           return false;
         }
@@ -681,7 +689,7 @@ public class TestPathBasedCacheRequests 
         paths.add(p.toUri().getPath());
       }
       // Check the initial statistics at the namenode
-      waitForCachedBlocks(namenode, 0, 0);
+      waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
       // Cache and check each path in sequence
       int expected = 0;
       for (int i=0; i<numFiles; i++) {
@@ -692,7 +700,8 @@ public class TestPathBasedCacheRequests 
               build();
         nnRpc.addPathBasedCacheDirective(directive);
         expected += numBlocksPerFile;
-        waitForCachedBlocks(namenode, expected, expected);
+        waitForCachedBlocks(namenode, expected, expected,
+            "testWaitForCachedReplicas:1");
       }
       // Uncache and check each path in sequence
       RemoteIterator<PathBasedCacheDirective> entries =
@@ -701,7 +710,8 @@ public class TestPathBasedCacheRequests 
         PathBasedCacheDirective directive = entries.next();
         nnRpc.removePathBasedCacheDirective(directive.getId());
         expected -= numBlocksPerFile;
-        waitForCachedBlocks(namenode, expected, expected);
+        waitForCachedBlocks(namenode, expected, expected,
+            "testWaitForCachedReplicas:2");
       }
     } finally {
       cluster.shutdown();
@@ -735,7 +745,8 @@ public class TestPathBasedCacheRequests 
         paths.add(p.toUri().getPath());
       }
       // Check the initial statistics at the namenode
-      waitForCachedBlocks(namenode, 0, 0);
+      waitForCachedBlocks(namenode, 0, 0,
+          "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:0");
       // Cache and check each path in sequence
       int expected = 0;
       for (int i=0; i<numFiles; i++) {
@@ -745,10 +756,12 @@ public class TestPathBasedCacheRequests 
               setPool(pool).
               build();
         dfs.addPathBasedCacheDirective(directive);
-        waitForCachedBlocks(namenode, expected, 0);
+        waitForCachedBlocks(namenode, expected, 0,
+          "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:1");
       }
       Thread.sleep(20000);
-      waitForCachedBlocks(namenode, expected, 0);
+      waitForCachedBlocks(namenode, expected, 0,
+          "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:2");
     } finally {
       cluster.shutdown();
     }
@@ -781,7 +794,8 @@ public class TestPathBasedCacheRequests 
         FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
             (int)BLOCK_SIZE, (short)3, false);
       }
-      waitForCachedBlocks(namenode, 0, 0);
+      waitForCachedBlocks(namenode, 0, 0,
+          "testWaitForCachedReplicasInDirectory:0");
       // cache entire directory
       long id = dfs.addPathBasedCacheDirective(
             new PathBasedCacheDirective.Builder().
@@ -789,10 +803,12 @@ public class TestPathBasedCacheRequests 
               setReplication((short)2).
               setPool(pool).
               build());
-      waitForCachedBlocks(namenode, 4, 8);
+      waitForCachedBlocks(namenode, 4, 8,
+          "testWaitForCachedReplicasInDirectory:1");
       // remove and watch numCached go to 0
       dfs.removePathBasedCacheDirective(id);
-      waitForCachedBlocks(namenode, 0, 0);
+      waitForCachedBlocks(namenode, 0, 0,
+          "testWaitForCachedReplicasInDirectory:2");
     } finally {
       cluster.shutdown();
     }
@@ -830,7 +846,7 @@ public class TestPathBasedCacheRequests 
         FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
             (int)BLOCK_SIZE, (short)3, false);
       }
-      waitForCachedBlocks(namenode, 0, 0);
+      waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
       checkNumCachedReplicas(dfs, paths, 0, 0);
       // cache directory
       long id = dfs.addPathBasedCacheDirective(
@@ -839,7 +855,7 @@ public class TestPathBasedCacheRequests 
             setReplication((short)1).
             setPool(pool).
             build());
-      waitForCachedBlocks(namenode, 4, 4);
+      waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
       checkNumCachedReplicas(dfs, paths, 4, 4);
       // step up the replication factor
       for (int i=2; i<=3; i++) {
@@ -848,7 +864,7 @@ public class TestPathBasedCacheRequests 
             setId(id).
             setReplication((short)i).
             build());
-        waitForCachedBlocks(namenode, 4, 4*i);
+        waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
         checkNumCachedReplicas(dfs, paths, 4, 4*i);
       }
       // step it down
@@ -858,12 +874,12 @@ public class TestPathBasedCacheRequests 
             setId(id).
             setReplication((short)i).
             build());
-        waitForCachedBlocks(namenode, 4, 4*i);
+        waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
         checkNumCachedReplicas(dfs, paths, 4, 4*i);
       }
       // remove and watch numCached go to 0
       dfs.removePathBasedCacheDirective(id);
-      waitForCachedBlocks(namenode, 0, 0);
+      waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
       checkNumCachedReplicas(dfs, paths, 0, 0);
     } finally {
       cluster.shutdown();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java Thu Nov 14 23:56:56 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -47,7 +48,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiffList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.io.IOUtils;
@@ -949,4 +953,54 @@ public class TestSnapshotDeletion {
     psOut.close();
     out.close();
   }
+
+  /*
+   * OP_DELETE_SNAPSHOT edits op was not decrementing the safemode threshold on
+   * restart in HA mode. HDFS-5504
+   */
+  @Test(timeout = 60000)
+  public void testHANNRestartAfterSnapshotDeletion() throws Exception {
+    hdfs.close();
+    cluster.shutdown();
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1)
+        .build();
+    cluster.transitionToActive(0);
+    // stop the standby namenode
+    NameNode snn = cluster.getNameNode(1);
+    snn.stop();
+
+    hdfs = (DistributedFileSystem) HATestUtil
+        .configureFailoverFs(cluster, conf);
+    Path dir = new Path("/dir");
+    Path subDir = new Path(dir, "sub");
+    hdfs.mkdirs(dir);
+    hdfs.allowSnapshot(dir);
+    for (int i = 0; i < 5; i++) {
+      DFSTestUtil.createFile(hdfs, new Path(subDir, "" + i), 100, (short) 1,
+          1024L);
+    }
+
+    // take snapshot
+    hdfs.createSnapshot(dir, "s0");
+
+    // delete the subdir
+    hdfs.delete(subDir, true);
+
+    // roll the edit log
+    NameNode ann = cluster.getNameNode(0);
+    ann.getRpcServer().rollEditLog();
+
+    hdfs.deleteSnapshot(dir, "s0");
+    // wait for the blocks deletion at namenode
+    Thread.sleep(2000);
+
+    NameNodeAdapter.abortEditLogs(ann);
+    cluster.restartNameNode(0, false);
+    cluster.transitionToActive(0);
+
+    // wait till the cluster becomes active
+    cluster.waitClusterUp();
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java Thu Nov 14 23:56:56 2013
@@ -22,7 +22,6 @@ import static org.apache.hadoop.fs.Commo
 import static org.junit.Assert.*;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.URI;
@@ -40,6 +39,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestHftpDelegationToken {
 
@@ -71,9 +71,8 @@ public class TestHftpDelegationToken {
 	});
     assertSame("wrong kind of file system", HftpFileSystem.class,
                  fs.getClass());
-    Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken");
-    renewToken.setAccessible(true);
-    assertSame("wrong token", token, renewToken.get(fs));
+    assertSame("wrong token", token,
+        Whitebox.getInternalState(fs, "renewToken"));
   }
 
   @Test
@@ -81,7 +80,7 @@ public class TestHftpDelegationToken {
     SecurityUtilTestHelper.setTokenServiceUseIp(true);
 
     Configuration conf = new Configuration();
-    conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
+    conf.setClass("fs.hftp.impl", HftpFileSystem.class, FileSystem.class);
 
     int httpPort = 80;
     int httpsPort = 443;
@@ -90,21 +89,21 @@ public class TestHftpDelegationToken {
 
     // test with implicit default port
     URI fsUri = URI.create("hftp://localhost");
-    MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    HftpFileSystem fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
     assertEquals(httpPort, fs.getCanonicalUri().getPort());
     checkTokenSelection(fs, httpPort, conf);
 
     // test with explicit default port
     // Make sure it uses the port from the hftp URI.
     fsUri = URI.create("hftp://localhost:"+httpPort);
-    fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
     assertEquals(httpPort, fs.getCanonicalUri().getPort());
     checkTokenSelection(fs, httpPort, conf);
 
     // test with non-default port
     // Make sure it uses the port from the hftp URI.
     fsUri = URI.create("hftp://localhost:"+(httpPort+1));
-    fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
     assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
     checkTokenSelection(fs, httpPort + 1, conf);
 
@@ -116,7 +115,7 @@ public class TestHftpDelegationToken {
     SecurityUtilTestHelper.setTokenServiceUseIp(true);
 
     Configuration conf = new Configuration();
-    conf.setClass("fs.hsftp.impl", MyHsftpFileSystem.class, FileSystem.class);
+    conf.setClass("fs.hsftp.impl", HsftpFileSystem.class, FileSystem.class);
 
     int httpPort = 80;
     int httpsPort = 443;
@@ -125,19 +124,19 @@ public class TestHftpDelegationToken {
 
     // test with implicit default port
     URI fsUri = URI.create("hsftp://localhost");
-    MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    HsftpFileSystem fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
     assertEquals(httpsPort, fs.getCanonicalUri().getPort());
     checkTokenSelection(fs, httpsPort, conf);
 
     // test with explicit default port
     fsUri = URI.create("hsftp://localhost:"+httpsPort);
-    fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
     assertEquals(httpsPort, fs.getCanonicalUri().getPort());
     checkTokenSelection(fs, httpsPort, conf);
 
     // test with non-default port
     fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
-    fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
     assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
     checkTokenSelection(fs, httpsPort+1, conf);
 
@@ -197,6 +196,9 @@ public class TestHftpDelegationToken {
     UserGroupInformation ugi =
         UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
 
+    @SuppressWarnings("unchecked")
+    TokenAspect<HftpFileSystem> aspect = (TokenAspect<HftpFileSystem>) Whitebox.getInternalState(fs, "tokenAspect");
+
     // use ip-based tokens
     SecurityUtilTestHelper.setTokenServiceUseIp(true);
 
@@ -208,7 +210,7 @@ public class TestHftpDelegationToken {
     ugi.addToken(hdfsToken);
 
     // test fallback to hdfs token
-    Token<?> token = fs.selectDelegationToken(ugi);
+    Token<?> token = aspect.selectDelegationToken(ugi);
     assertNotNull(token);
     assertEquals(hdfsToken, token);
 
@@ -217,13 +219,13 @@ public class TestHftpDelegationToken {
         new byte[0], new byte[0],
         HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
     ugi.addToken(hftpToken);
-    token = fs.selectDelegationToken(ugi);
+    token = aspect.selectDelegationToken(ugi);
     assertNotNull(token);
     assertEquals(hftpToken, token);
 
     // switch to using host-based tokens, no token should match
     SecurityUtilTestHelper.setTokenServiceUseIp(false);
-    token = fs.selectDelegationToken(ugi);
+    token = aspect.selectDelegationToken(ugi);
     assertNull(token);
 
     // test fallback to hdfs token
@@ -232,7 +234,7 @@ public class TestHftpDelegationToken {
         DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
         new Text("localhost:8020"));
     ugi.addToken(hdfsToken);
-    token = fs.selectDelegationToken(ugi);
+    token = aspect.selectDelegationToken(ugi);
     assertNotNull(token);
     assertEquals(hdfsToken, token);
 
@@ -241,36 +243,8 @@ public class TestHftpDelegationToken {
         new byte[0], new byte[0],
         HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port));
     ugi.addToken(hftpToken);
-    token = fs.selectDelegationToken(ugi);
+    token = aspect.selectDelegationToken(ugi);
     assertNotNull(token);
     assertEquals(hftpToken, token);
   }
-
-  static class MyHftpFileSystem extends HftpFileSystem {
-    @Override
-    public URI getCanonicalUri() {
-      return super.getCanonicalUri();
-    }
-    @Override
-    public int getDefaultPort() {
-      return super.getDefaultPort();
-    }
-    // don't automatically get a token
-    @Override
-    protected void initDelegationToken() throws IOException {}
-  }
-
-  static class MyHsftpFileSystem extends HsftpFileSystem {
-    @Override
-    public URI getCanonicalUri() {
-      return super.getCanonicalUri();
-    }
-    @Override
-    public int getDefaultPort() {
-      return super.getDefaultPort();
-    }
-    // don't automatically get a token
-    @Override
-    protected void initDelegationToken() throws IOException {}
-  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java Thu Nov 14 23:56:56 2013
@@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import java.util.Iterator;
 import java.util.Map;
@@ -37,10 +39,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
 import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -59,6 +63,7 @@ import org.jboss.netty.channel.socket.ni
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpRequest;
 import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -78,9 +83,10 @@ public class TestDelegationTokenRemoteFe
 
   private static final String EXP_DATE = "124123512361236";
   private static final String tokenFile = "http.file.dta";
+  private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
 
   private int httpPort;
-  private String serviceUrl;
+  private URI serviceUrl;
   private FileSystem fileSys;
   private Configuration conf;
   private ServerBootstrap bootstrap;
@@ -92,7 +98,7 @@ public class TestDelegationTokenRemoteFe
     conf = new Configuration();
     fileSys = FileSystem.getLocal(conf);
     httpPort = NetUtils.getFreeSocketPort();
-    serviceUrl = "http://localhost:" + httpPort;
+    serviceUrl = new URI("http://localhost:" + httpPort);
     testToken = createToken(serviceUrl);
   }
 
@@ -121,9 +127,9 @@ public class TestDelegationTokenRemoteFe
    * try to fetch token without http server with IOException
    */
   @Test
-  public void testTokenRenewFail() {
+  public void testTokenRenewFail() throws AuthenticationException {
     try {
-      DelegationTokenFetcher.renewDelegationToken(serviceUrl, testToken);
+      DelegationTokenFetcher.renewDelegationToken(connectionFactory, serviceUrl, testToken);
       fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
     } catch (IOException ex) {
     } 
@@ -133,9 +139,9 @@ public class TestDelegationTokenRemoteFe
    * try cancel token without http server with IOException
    */
   @Test
-  public void expectedTokenCancelFail() {
+  public void expectedTokenCancelFail() throws AuthenticationException {
     try {
-      DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
+      DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl, testToken);
       fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
     } catch (IOException ex) {
     } 
@@ -145,11 +151,12 @@ public class TestDelegationTokenRemoteFe
    * try fetch token and get http response with error
    */
   @Test  
-  public void expectedTokenRenewErrorHttpResponse() {
+  public void expectedTokenRenewErrorHttpResponse()
+      throws AuthenticationException, URISyntaxException {
     bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
     try {
-      DelegationTokenFetcher.renewDelegationToken(serviceUrl + "/exception", 
-          createToken(serviceUrl));
+      DelegationTokenFetcher.renewDelegationToken(connectionFactory, new URI(
+          serviceUrl.toString() + "/exception"), createToken(serviceUrl));
       fail("Token fetcher shouldn't be able to renew tokens using an invalid"
           + " NN URL");
     } catch (IOException ex) {
@@ -159,13 +166,14 @@ public class TestDelegationTokenRemoteFe
   }
   
   /**
-   *   
    *
    */
   @Test
-  public void testCancelTokenFromHttp() throws IOException {
+  public void testCancelTokenFromHttp() throws IOException,
+      AuthenticationException {
     bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
-    DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
+    DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl,
+        testToken);
     if (assertionError != null)
       throw assertionError;
   }
@@ -174,11 +182,12 @@ public class TestDelegationTokenRemoteFe
    * Call renew token using http server return new expiration time
    */
   @Test
-  public void testRenewTokenFromHttp() throws IOException {
+  public void testRenewTokenFromHttp() throws IOException,
+      NumberFormatException, AuthenticationException {
     bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
     assertTrue("testRenewTokenFromHttp error",
         Long.valueOf(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken(
-            serviceUrl, testToken));
+            connectionFactory, serviceUrl, testToken));
     if (assertionError != null)
       throw assertionError;
   }
@@ -204,11 +213,11 @@ public class TestDelegationTokenRemoteFe
       throw assertionError;
   }
   
-  private static Token<DelegationTokenIdentifier> createToken(String serviceUri) {
+  private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) {
     byte[] pw = "hadoop".getBytes();
     byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
         "renewer"), new Text("realuser")).getBytes();
-    Text service = new Text(serviceUri);
+    Text service = new Text(serviceUri.toString());
     return new Token<DelegationTokenIdentifier>(ident, pw,
         HftpFileSystem.TOKEN_KIND, service);
   }
@@ -301,8 +310,15 @@ public class TestDelegationTokenRemoteFe
     public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
         throws Exception {
       HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != GET) {
-        return;
+
+      if (request.getMethod() == HttpMethod.OPTIONS) {
+        // Mimic SPNEGO authentication
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
+            HttpResponseStatus.OK);
+        response.addHeader("Set-Cookie", "hadoop-auth=1234");
+        e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+      } else if (request.getMethod() != GET) {
+        e.getChannel().close();
       }
       UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
           .iterator();
@@ -338,7 +354,7 @@ public class TestDelegationTokenRemoteFe
   }
 
   private ServerBootstrap startHttpServer(int port,
-      final Token<DelegationTokenIdentifier> token, final String url) {
+      final Token<DelegationTokenIdentifier> token, final URI url) {
     ServerBootstrap bootstrap = new ServerBootstrap(
         new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
             Executors.newCachedThreadPool()));
@@ -348,7 +364,7 @@ public class TestDelegationTokenRemoteFe
       public ChannelPipeline getPipeline() throws Exception {
         return Channels.pipeline(new HttpRequestDecoder(),
             new HttpChunkAggregator(65536), new HttpResponseEncoder(),
-            new CredentialsLogicHandler(token, url));
+            new CredentialsLogicHandler(token, url.toString()));
       }
     });
     bootstrap.bind(new InetSocketAddress("localhost", port));