You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/11/15 00:57:00 UTC
svn commit: r1542125 [2/2] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/libexec/ hadoop-hdfs/ hadoop-hdfs/src/main/java/
hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/
hadoop-hdfs/src/main/java/org/apache/h...
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java Thu Nov 14 23:56:56 2013
@@ -31,7 +31,6 @@ import java.security.PrivilegedException
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.TimeZone;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -50,20 +49,17 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
-import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ServletUtil;
import org.xml.sax.Attributes;
@@ -83,7 +79,9 @@ import org.xml.sax.helpers.XMLReaderFact
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class HftpFileSystem extends FileSystem
- implements DelegationTokenRenewer.Renewable {
+ implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
+ public static final String SCHEME = "hftp";
+
static {
HttpURLConnection.setFollowRedirects(true);
}
@@ -100,19 +98,13 @@ public class HftpFileSystem extends File
public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+ private TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
private Token<?> delegationToken;
private Token<?> renewToken;
- private static final HftpDelegationTokenSelector hftpTokenSelector =
- new HftpDelegationTokenSelector();
-
- private DelegationTokenRenewer dtRenewer = null;
- private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
- if (dtRenewer == null) {
- dtRenewer = DelegationTokenRenewer.getInstance();
- }
-
- dtRenewer.addRenewAction(hftpFs);
+ @Override
+ public URI getCanonicalUri() {
+ return super.getCanonicalUri();
}
public static final SimpleDateFormat getDateFormat() {
@@ -177,7 +169,7 @@ public class HftpFileSystem extends File
*/
@Override
public String getScheme() {
- return "hftp";
+ return SCHEME;
}
@Override
@@ -195,39 +187,10 @@ public class HftpFileSystem extends File
}
if (UserGroupInformation.isSecurityEnabled()) {
- initDelegationToken();
- }
- }
-
- protected void initDelegationToken() throws IOException {
- // look for hftp token, then try hdfs
- Token<?> token = selectDelegationToken(ugi);
-
- // if we don't already have a token, go get one over https
- boolean createdToken = false;
- if (token == null) {
- token = getDelegationToken(null);
- createdToken = (token != null);
+ tokenAspect.initDelegationToken(ugi);
}
-
- // we already had a token or getDelegationToken() didn't fail.
- if (token != null) {
- setDelegationToken(token);
- if (createdToken) {
- addRenewAction(this);
- LOG.debug("Created new DT for " + token.getService());
- } else {
- LOG.debug("Found existing DT for " + token.getService());
- }
- }
- }
-
- protected Token<DelegationTokenIdentifier> selectDelegationToken(
- UserGroupInformation ugi) {
- return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
}
-
@Override
public Token<?> getRenewToken() {
return renewToken;
@@ -242,16 +205,19 @@ public class HftpFileSystem extends File
@Override
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+ /**
+ * XXX The kind of the token has been changed by DelegationTokenFetcher. We
+ * use the token for renewal, since the reflection utilities needs the value
+ * of the kind field to correctly renew the token.
+ *
+ * For other operations, however, the client has to send a
+ * HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop
+ * 0.20.3 clusters. Later releases fix this problem. See HDFS-5440 for more
+ * details.
+ */
renewToken = token;
- // emulate the 203 usage of the tokens
- // by setting the kind and service as if they were hdfs tokens
delegationToken = new Token<T>(token);
- // NOTE: the remote nn must be configured to use hdfs
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
- // no need to change service because we aren't exactly sure what it
- // should be. we can guess, but it might be wrong if the local conf
- // value is incorrect. the service is a client side field, so the remote
- // end does not care about the value
}
@Override
@@ -266,7 +232,7 @@ public class HftpFileSystem extends File
final String nnHttpUrl = nnUri.toString();
Credentials c;
try {
- c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
+ c = DelegationTokenFetcher.getDTfromRemote(connectionFactory, nnUri, renewer);
} catch (IOException e) {
if (e.getCause() instanceof ConnectException) {
LOG.warn("Couldn't connect to " + nnHttpUrl +
@@ -350,6 +316,7 @@ public class HftpFileSystem extends File
String tokenString = null;
if (UserGroupInformation.isSecurityEnabled()) {
synchronized (this) {
+ tokenAspect.ensureTokenInitialized();
if (delegationToken != null) {
tokenString = delegationToken.encodeToUrlString();
return (query + JspHelper.getDelegationTokenUrlParam(tokenString));
@@ -419,9 +386,7 @@ public class HftpFileSystem extends File
@Override
public void close() throws IOException {
super.close();
- if (dtRenewer != null) {
- dtRenewer.removeRenewAction(this); // blocks
- }
+ tokenAspect.removeRenewAction();
}
/** Class to parse and store a listing reply from the server. */
@@ -696,67 +661,33 @@ public class HftpFileSystem extends File
return cs != null? cs: super.getContentSummary(f);
}
- @InterfaceAudience.Private
- public static class TokenManager extends TokenRenewer {
-
- @Override
- public boolean handleKind(Text kind) {
- return kind.equals(TOKEN_KIND);
- }
-
- @Override
- public boolean isManaged(Token<?> token) throws IOException {
- return true;
- }
-
- protected String getUnderlyingProtocol() {
- return "http";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public long renew(Token<?> token,
- Configuration conf) throws IOException {
- // update the kerberos credentials, if they are coming from a keytab
- UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
- InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
- return
- DelegationTokenFetcher.renewDelegationToken
- (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
- (Token<DelegationTokenIdentifier>) token);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void cancel(Token<?> token,
- Configuration conf) throws IOException {
- // update the kerberos credentials, if they are coming from a keytab
- UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
- InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
- DelegationTokenFetcher.cancelDelegationToken
- (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
- (Token<DelegationTokenIdentifier>) token);
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renewDelegationToken(Token<?> token) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+ try {
+ return DelegationTokenFetcher.renewDelegationToken(connectionFactory,
+ DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr),
+ (Token<DelegationTokenIdentifier>) token);
+ } catch (AuthenticationException e) {
+ throw new IOException(e);
}
}
- private static class HftpDelegationTokenSelector
- extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
- private static final DelegationTokenSelector hdfsTokenSelector =
- new DelegationTokenSelector();
-
- public HftpDelegationTokenSelector() {
- super(TOKEN_KIND);
- }
-
- Token<DelegationTokenIdentifier> selectToken(URI nnUri,
- Collection<Token<?>> tokens, Configuration conf) {
- Token<DelegationTokenIdentifier> token =
- selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
- if (token == null) {
- // try to get a HDFS token
- token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
- }
- return token;
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancelDelegationToken(Token<?> token) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+ try {
+ DelegationTokenFetcher.cancelDelegationToken(connectionFactory, DFSUtil
+ .createUri(getUnderlyingProtocol(), serviceAddr),
+ (Token<DelegationTokenIdentifier>) token);
+ } catch (AuthenticationException e) {
+ throw new IOException(e);
}
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java Thu Nov 14 23:56:56 2013
@@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -77,24 +77,28 @@ public class URLConnectionFactory {
* @throws IOException
*/
public URLConnection openConnection(URL url) throws IOException {
- URLConnection connection = url.openConnection();
- if (connection instanceof HttpURLConnection) {
- connConfigurator.configure((HttpURLConnection) connection);
+ try {
+ return openConnection(url, false);
+ } catch (AuthenticationException e) {
+ // Unreachable
+ return null;
}
- return connection;
}
/**
* Opens a url with read and connect timeouts
*
- * @param url URL to open
+ * @param url
+ * URL to open
+ * @param isSpnego
+ * whether the url should be authenticated via SPNEGO
* @return URLConnection
* @throws IOException
* @throws AuthenticationException
*/
- public URLConnection openConnection(HttpOpParam.Op op, URL url)
+ public URLConnection openConnection(URL url, boolean isSpnego)
throws IOException, AuthenticationException {
- if (op.getRequireAuth()) {
+ if (isSpnego) {
if (LOG.isDebugEnabled()) {
LOG.debug("open AuthenticatedURL connection" + url);
}
@@ -106,7 +110,11 @@ public class URLConnectionFactory {
if (LOG.isDebugEnabled()) {
LOG.debug("open URL connection");
}
- return openConnection(url);
+ URLConnection connection = url.openConnection();
+ if (connection instanceof HttpURLConnection) {
+ connConfigurator.configure((HttpURLConnection) connection);
+ }
+ return connection;
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Thu Nov 14 23:56:56 2013
@@ -30,7 +30,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
@@ -56,8 +55,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.web.TokenAspect.DTSelecorByKind;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
@@ -96,8 +95,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.util.Progressable;
import org.mortbay.util.ajax.JSON;
@@ -107,7 +104,7 @@ import com.google.common.collect.Lists;
/** A FileSystem for HDFS over the web. */
public class WebHdfsFileSystem extends FileSystem
- implements DelegationTokenRenewer.Renewable {
+ implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
/** File System URI: {SCHEME}://namenode:port/path/to/file */
public static final String SCHEME = "webhdfs";
@@ -122,13 +119,18 @@ public class WebHdfsFileSystem extends F
/** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
/** Token selector */
- public static final WebHdfsDelegationTokenSelector DT_SELECTOR
- = new WebHdfsDelegationTokenSelector();
+ public static final DTSelecorByKind DT_SELECTOR
+ = new DTSelecorByKind(TOKEN_KIND);
private DelegationTokenRenewer dtRenewer = null;
@VisibleForTesting
DelegationTokenRenewer.RenewAction<?> action;
+ @Override
+ public URI getCanonicalUri() {
+ return super.getCanonicalUri();
+ }
+
@VisibleForTesting
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
if (dtRenewer == null) {
@@ -142,7 +144,6 @@ public class WebHdfsFileSystem extends F
public static boolean isEnabled(final Configuration conf, final Log log) {
final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
- log.info(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY + " = " + b);
return b;
}
@@ -490,7 +491,8 @@ public class WebHdfsFileSystem extends F
throws IOException {
final HttpURLConnection conn;
try {
- conn = (HttpURLConnection) connectionFactory.openConnection(op, url);
+ conn = (HttpURLConnection) connectionFactory.openConnection(url,
+ op.getRequireAuth());
} catch (AuthenticationException e) {
throw new IOException(e);
}
@@ -986,7 +988,8 @@ public class WebHdfsFileSystem extends F
}
}
- private synchronized long renewDelegationToken(final Token<?> token
+ @Override
+ public synchronized long renewDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
TokenArgumentParam dtargParam = new TokenArgumentParam(
@@ -995,7 +998,8 @@ public class WebHdfsFileSystem extends F
return (Long) m.get("long");
}
- private synchronized void cancelDelegationToken(final Token<?> token
+ @Override
+ public synchronized void cancelDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
TokenArgumentParam dtargParam = new TokenArgumentParam(
@@ -1041,57 +1045,4 @@ public class WebHdfsFileSystem extends F
final Map<?, ?> m = run(op, p);
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
}
-
- /** Delegation token renewer. */
- public static class DtRenewer extends TokenRenewer {
- @Override
- public boolean handleKind(Text kind) {
- return kind.equals(TOKEN_KIND);
- }
-
- @Override
- public boolean isManaged(Token<?> token) throws IOException {
- return true;
- }
-
- private static WebHdfsFileSystem getWebHdfs(
- final Token<?> token, final Configuration conf) throws IOException {
-
- final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
- final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
- return (WebHdfsFileSystem)FileSystem.get(uri, conf);
- }
-
- @Override
- public long renew(final Token<?> token, final Configuration conf
- ) throws IOException, InterruptedException {
- return getWebHdfs(token, conf).renewDelegationToken(token);
- }
-
- @Override
- public void cancel(final Token<?> token, final Configuration conf
- ) throws IOException, InterruptedException {
- getWebHdfs(token, conf).cancelDelegationToken(token);
- }
- }
-
- private static class WebHdfsDelegationTokenSelector
- extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
- private static final DelegationTokenSelector hdfsTokenSelector =
- new DelegationTokenSelector();
-
- public WebHdfsDelegationTokenSelector() {
- super(TOKEN_KIND);
- }
-
- Token<DelegationTokenIdentifier> selectToken(URI nnUri,
- Collection<Token<?>> tokens, Configuration conf) {
- Token<DelegationTokenIdentifier> token =
- selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
- if (token == null) {
- token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
- }
- return token;
- }
- }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Thu Nov 14 23:56:56 2013
@@ -13,5 +13,4 @@
#
org.apache.hadoop.hdfs.DFSClient$Renewer
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
-org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
-org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer
+org.apache.hadoop.hdfs.web.TokenAspect$TokenManager
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Thu Nov 14 23:56:56 2013
@@ -1517,6 +1517,18 @@
</property>
<property>
+ <name>dfs.namenode.path.based.cache.retry.interval.ms</name>
+ <value>60000</value>
+ <description>
+ When the NameNode needs to uncache something that is cached, or cache
+ something that is not cached, it must direct the DataNodes to do so by
+ sending a DNA_CACHE or DNA_UNCACHE command in response to a DataNode
+ heartbeat. This parameter controls how frequently the NameNode will
+ resend these commands.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
<value>4</value>
<description>
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1535792-1536571,1536573-1542122
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html Thu Nov 14 23:56:56 2013
@@ -30,6 +30,12 @@
</div>
<div id="panel"></div>
</div>
+<div class="row">
+<hr />
+<div class="col-xs-2"><p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013.</p></div>
+<div class="col-xs-1 pull-right"><a style="color: #ddd" href="dfshealth.jsp">Legacy UI</a></div>
+</div>
+
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/2.0.3/jquery.min.js">
</script><script type="text/javascript" src="//netdna.bootstrapcdn.com/bootstrap/3.0.0/js/bootstrap.min.js">
</script><script type="text/javascript" src="/static/dust-full-2.0.0.min.js">
@@ -37,7 +43,5 @@
</script><script type="text/javascript" src="dfs-dust.js">
</script><script type="text/javascript" src="dfshealth.js">
</script>
-<hr />
-<p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013.</p>
</body>
</html>
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp Thu Nov 14 23:56:56 2013
@@ -70,6 +70,6 @@
<hr/>
<h3>Startup Progress</h3>
<% healthjsp.generateStartupProgress(out, nn.getStartupProgress()); %>
-<%
-out.println(ServletUtil.htmlFooter());
-%>
+<hr/><p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013. <a href="dfshealth.html">New UI</a></p>
+</body>
+</html>
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html Thu Nov 14 23:56:56 2013
@@ -14,22 +14,22 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<meta HTTP-EQUIV="REFRESH" content="0;url=dfshealth.jsp"/>
-<html>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
+ "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
<head>
+<meta http-equiv="REFRESH" content="1;url=dfshealth.jsp" />
<title>Hadoop Administration</title>
</head>
-
<body>
-
+<script type="text/javascript">
+//<![CDATA[
+window.location.href='dfshealth.html';
+//]]>
+</script>
<h1>Hadoop Administration</h1>
-
<ul>
-
<li><a href="dfshealth.jsp">DFS Health/Status</a></li>
-
</ul>
-
</body>
-
</html>
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Thu Nov 14 23:56:56 2013
@@ -26,10 +26,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
/**
* This tests pipeline recovery related client protocol works correct or not.
*/
@@ -112,4 +116,55 @@ public class TestClientProtocolForPipeli
cluster.shutdown();
}
}
+
+ /** Test whether corrupt replicas are detected correctly during pipeline
+ * recoveries.
+ */
+ @Test
+ public void testPipelineRecoveryForLastBlock() throws IOException {
+ DFSClientFaultInjector faultInjector
+ = Mockito.mock(DFSClientFaultInjector.class);
+ DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
+ DFSClientFaultInjector.instance = faultInjector;
+ Configuration conf = new HdfsConfiguration();
+
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+ MiniDFSCluster cluster = null;
+
+ try {
+ int numDataNodes = 3;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+
+ Path file = new Path("dataprotocol1.dat");
+ Mockito.when(faultInjector.failPacket()).thenReturn(true);
+ try {
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ } catch (IOException e) {
+ // completeFile() should fail.
+ Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
+ return;
+ }
+
+ // At this point, NN let data corruption to happen.
+ // Before failing test, try reading the file. It should fail.
+ FSDataInputStream in = fileSys.open(file);
+ try {
+ int c = in.read();
+ // Test will fail with BlockMissingException if NN does not update the
+ // replica state based on the latest report.
+ } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
+ Assert.fail("Block is missing because the file was closed with"
+ + " corrupt replicas.");
+ }
+ Assert.fail("The file was closed with corrupt replicas, but read still"
+ + " works!");
+ } finally {
+ DFSClientFaultInjector.instance = oldInjector;
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java Thu Nov 14 23:56:56 2013
@@ -86,6 +86,9 @@ public class TestQuota {
// Space quotas
final int DEFAULT_BLOCK_SIZE = 512;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ // Make it relinquish locks. When run serially, the result should
+ // be identical.
+ conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
@@ -350,6 +353,7 @@ public class TestQuota {
}
assertTrue(hasException);
+ assertEquals(4, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally {
cluster.shutdown();
}
@@ -360,6 +364,9 @@ public class TestQuota {
@Test
public void testNamespaceCommands() throws Exception {
final Configuration conf = new HdfsConfiguration();
+ // Make it relinquish locks. When run serially, the result should
+ // be identical.
+ conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
@@ -515,6 +522,7 @@ public class TestQuota {
c = dfs.getContentSummary(quotaDir1);
assertEquals(c.getDirectoryCount(), 6);
assertEquals(c.getQuota(), 6);
+ assertEquals(14, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally {
cluster.shutdown();
}
@@ -532,6 +540,9 @@ public class TestQuota {
// set a smaller block size so that we can test with smaller
// diskspace quotas
conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
+ // Make it relinquish locks. When run serially, the result should
+ // be identical.
+ conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
@@ -764,6 +775,7 @@ public class TestQuota {
assertEquals(c.getSpaceConsumed(),
(sizeFactorA + sizeFactorB + sizeFactorC) * fileSpace);
+ assertEquals(20, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally {
cluster.shutdown();
}
@@ -905,6 +917,9 @@ public class TestQuota {
final int BLOCK_SIZE = 6 * 1024;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+ // Make it relinquish locks. When run serially, the result should
+ // be identical.
+ conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
@@ -971,6 +986,7 @@ public class TestQuota {
exceededQuota = true;
}
assertTrue("Quota not exceeded", exceededQuota);
+ assertEquals(2, cluster.getNamesystem().getFSDirectory().getYieldCount());
} finally {
cluster.shutdown();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Thu Nov 14 23:56:56 2013
@@ -28,8 +28,10 @@ import static org.mockito.Mockito.doRetu
import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +51,6 @@ import org.apache.hadoop.hdfs.protocolPB
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
@@ -87,8 +89,7 @@ public class TestFsDatasetCache {
private static FsDatasetSpi<?> fsd;
private static DatanodeProtocolClientSideTranslatorPB spyNN;
private static PageRounder rounder = new PageRounder();
-
- private Mlocker mlocker;
+ private static CacheManipulator prevCacheManipulator;
@Before
public void setUp() throws Exception {
@@ -96,6 +97,8 @@ public class TestFsDatasetCache {
assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
+ 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY);
@@ -113,8 +116,19 @@ public class TestFsDatasetCache {
fsd = dn.getFSDataset();
spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
- // Save the current mlocker and replace it at the end of the test
- mlocker = MappableBlock.mlocker;
+
+ prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
+
+ // Save the current CacheManipulator and replace it at the end of the test
+ // Stub out mlock calls to avoid failing when not enough memory is lockable
+ // by the operating system.
+ NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
+ @Override
+ public void mlock(String identifier,
+ ByteBuffer mmap, long length) throws IOException {
+ LOG.info("mlocking " + identifier);
+ }
+ };
}
@After
@@ -125,8 +139,8 @@ public class TestFsDatasetCache {
if (cluster != null) {
cluster.shutdown();
}
- // Restore the original mlocker
- MappableBlock.mlocker = mlocker;
+ // Restore the original CacheManipulator
+ NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
}
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
@@ -214,8 +228,7 @@ public class TestFsDatasetCache {
return expected;
}
- @Test(timeout=600000)
- public void testCacheAndUncacheBlock() throws Exception {
+ private void testCacheAndUncacheBlock() throws Exception {
LOG.info("beginning testCacheAndUncacheBlock");
final int NUM_BLOCKS = 5;
@@ -269,6 +282,42 @@ public class TestFsDatasetCache {
}
@Test(timeout=600000)
+ public void testCacheAndUncacheBlockSimple() throws Exception {
+ testCacheAndUncacheBlock();
+ }
+
+ /**
+ * Run testCacheAndUncacheBlock with some failures injected into the mlock
+ * call. This tests the ability of the NameNode to resend commands.
+ */
+ @Test(timeout=600000)
+ public void testCacheAndUncacheBlockWithRetries() throws Exception {
+ CacheManipulator prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
+
+ try {
+ NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
+ private final Set<String> seenIdentifiers = new HashSet<String>();
+
+ @Override
+ public void mlock(String identifier,
+ ByteBuffer mmap, long length) throws IOException {
+ if (seenIdentifiers.contains(identifier)) {
+ // mlock succeeds the second time.
+ LOG.info("mlocking " + identifier);
+ return;
+ }
+ seenIdentifiers.add(identifier);
+ throw new IOException("injecting IOException during mlock of " +
+ identifier);
+ }
+ };
+ testCacheAndUncacheBlock();
+ } finally {
+ NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
+ }
+ }
+
+ @Test(timeout=600000)
public void testFilesExceedMaxLockedMemory() throws Exception {
LOG.info("beginning testFilesExceedMaxLockedMemory");
@@ -357,10 +406,11 @@ public class TestFsDatasetCache {
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed);
- MappableBlock.mlocker = new MappableBlock.Mlocker() {
+ NativeIO.POSIX.cacheManipulator = new NativeIO.POSIX.CacheManipulator() {
@Override
- public void mlock(MappedByteBuffer mmap, long length) throws IOException {
- LOG.info("An mlock operation is starting.");
+ public void mlock(String identifier,
+ ByteBuffer mmap, long length) throws IOException {
+ LOG.info("An mlock operation is starting on " + identifier);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java Thu Nov 14 23:56:56 2013
@@ -93,6 +93,10 @@ public class TestCorruptFilesJsp {
in.close();
}
+ try {
+ Thread.sleep(3000); // Wait for block reports. They shouldn't matter.
+ } catch (InterruptedException ie) {}
+
// verify if all corrupt files were reported to NN
badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java Thu Nov 14 23:56:56 2013
@@ -33,6 +33,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.security.PrivilegedExceptionAction;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
@@ -81,15 +83,7 @@ public class TestPathBasedCacheRequests
static private MiniDFSCluster cluster;
static private DistributedFileSystem dfs;
static private NamenodeProtocols proto;
-
- static {
- MappableBlock.mlocker = new MappableBlock.Mlocker() {
- @Override
- public void mlock(MappedByteBuffer mmap, long length) throws IOException {
- // Stubbed out for testing
- }
- };
- }
+ static private CacheManipulator prevCacheManipulator;
@Before
public void setup() throws Exception {
@@ -101,6 +95,18 @@ public class TestPathBasedCacheRequests
cluster.waitActive();
dfs = cluster.getFileSystem();
proto = cluster.getNameNodeRpc();
+ prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
+
+ // Save the current CacheManipulator and replace it at the end of the test
+ // Stub out mlock calls to avoid failing when not enough memory is lockable
+ // by the operating system.
+ NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
+ @Override
+ public void mlock(String identifier,
+ ByteBuffer mmap, long length) throws IOException {
+ LOG.info("mlocking " + identifier);
+ }
+ };
}
@After
@@ -108,6 +114,8 @@ public class TestPathBasedCacheRequests
if (cluster != null) {
cluster.shutdown();
}
+ // Restore the original CacheManipulator
+ NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
}
@Test(timeout=60000)
@@ -552,8 +560,8 @@ public class TestPathBasedCacheRequests
* @throws Exception
*/
private static void waitForCachedBlocks(NameNode nn,
- final int expectedCachedBlocks, final int expectedCachedReplicas)
- throws Exception {
+ final int expectedCachedBlocks, final int expectedCachedReplicas,
+ final String logString) throws Exception {
final FSNamesystem namesystem = nn.getNamesystem();
final CacheManager cacheManager = namesystem.getCacheManager();
LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
@@ -581,9 +589,9 @@ public class TestPathBasedCacheRequests
(numCachedReplicas == expectedCachedReplicas)) {
return true;
} else {
- LOG.info("cached blocks: have " + numCachedBlocks +
- " / " + expectedCachedBlocks);
- LOG.info("cached replicas: have " + numCachedReplicas +
+ LOG.info(logString + " cached blocks: have " + numCachedBlocks +
+ " / " + expectedCachedBlocks + ". " +
+ "cached replicas: have " + numCachedReplicas +
" / " + expectedCachedReplicas);
return false;
}
@@ -681,7 +689,7 @@ public class TestPathBasedCacheRequests
paths.add(p.toUri().getPath());
}
// Check the initial statistics at the namenode
- waitForCachedBlocks(namenode, 0, 0);
+ waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
// Cache and check each path in sequence
int expected = 0;
for (int i=0; i<numFiles; i++) {
@@ -692,7 +700,8 @@ public class TestPathBasedCacheRequests
build();
nnRpc.addPathBasedCacheDirective(directive);
expected += numBlocksPerFile;
- waitForCachedBlocks(namenode, expected, expected);
+ waitForCachedBlocks(namenode, expected, expected,
+ "testWaitForCachedReplicas:1");
}
// Uncache and check each path in sequence
RemoteIterator<PathBasedCacheDirective> entries =
@@ -701,7 +710,8 @@ public class TestPathBasedCacheRequests
PathBasedCacheDirective directive = entries.next();
nnRpc.removePathBasedCacheDirective(directive.getId());
expected -= numBlocksPerFile;
- waitForCachedBlocks(namenode, expected, expected);
+ waitForCachedBlocks(namenode, expected, expected,
+ "testWaitForCachedReplicas:2");
}
} finally {
cluster.shutdown();
@@ -735,7 +745,8 @@ public class TestPathBasedCacheRequests
paths.add(p.toUri().getPath());
}
// Check the initial statistics at the namenode
- waitForCachedBlocks(namenode, 0, 0);
+ waitForCachedBlocks(namenode, 0, 0,
+ "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:0");
// Cache and check each path in sequence
int expected = 0;
for (int i=0; i<numFiles; i++) {
@@ -745,10 +756,12 @@ public class TestPathBasedCacheRequests
setPool(pool).
build();
dfs.addPathBasedCacheDirective(directive);
- waitForCachedBlocks(namenode, expected, 0);
+ waitForCachedBlocks(namenode, expected, 0,
+ "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:1");
}
Thread.sleep(20000);
- waitForCachedBlocks(namenode, expected, 0);
+ waitForCachedBlocks(namenode, expected, 0,
+ "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:2");
} finally {
cluster.shutdown();
}
@@ -781,7 +794,8 @@ public class TestPathBasedCacheRequests
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false);
}
- waitForCachedBlocks(namenode, 0, 0);
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:0");
// cache entire directory
long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
@@ -789,10 +803,12 @@ public class TestPathBasedCacheRequests
setReplication((short)2).
setPool(pool).
build());
- waitForCachedBlocks(namenode, 4, 8);
+ waitForCachedBlocks(namenode, 4, 8,
+ "testWaitForCachedReplicasInDirectory:1");
// remove and watch numCached go to 0
dfs.removePathBasedCacheDirective(id);
- waitForCachedBlocks(namenode, 0, 0);
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:2");
} finally {
cluster.shutdown();
}
@@ -830,7 +846,7 @@ public class TestPathBasedCacheRequests
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false);
}
- waitForCachedBlocks(namenode, 0, 0);
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
checkNumCachedReplicas(dfs, paths, 0, 0);
// cache directory
long id = dfs.addPathBasedCacheDirective(
@@ -839,7 +855,7 @@ public class TestPathBasedCacheRequests
setReplication((short)1).
setPool(pool).
build());
- waitForCachedBlocks(namenode, 4, 4);
+ waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
checkNumCachedReplicas(dfs, paths, 4, 4);
// step up the replication factor
for (int i=2; i<=3; i++) {
@@ -848,7 +864,7 @@ public class TestPathBasedCacheRequests
setId(id).
setReplication((short)i).
build());
- waitForCachedBlocks(namenode, 4, 4*i);
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// step it down
@@ -858,12 +874,12 @@ public class TestPathBasedCacheRequests
setId(id).
setReplication((short)i).
build());
- waitForCachedBlocks(namenode, 4, 4*i);
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// remove and watch numCached go to 0
dfs.removePathBasedCacheDirective(id);
- waitForCachedBlocks(namenode, 0, 0);
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
checkNumCachedReplicas(dfs, paths, 0, 0);
} finally {
cluster.shutdown();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java Thu Nov 14 23:56:56 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -47,7 +48,10 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiffList;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.io.IOUtils;
@@ -949,4 +953,54 @@ public class TestSnapshotDeletion {
psOut.close();
out.close();
}
+
+ /*
+ * OP_DELETE_SNAPSHOT edits op was not decrementing the safemode threshold on
+ * restart in HA mode. HDFS-5504
+ */
+ @Test(timeout = 60000)
+ public void testHANNRestartAfterSnapshotDeletion() throws Exception {
+ hdfs.close();
+ cluster.shutdown();
+ conf = new Configuration();
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1)
+ .build();
+ cluster.transitionToActive(0);
+ // stop the standby namenode
+ NameNode snn = cluster.getNameNode(1);
+ snn.stop();
+
+ hdfs = (DistributedFileSystem) HATestUtil
+ .configureFailoverFs(cluster, conf);
+ Path dir = new Path("/dir");
+ Path subDir = new Path(dir, "sub");
+ hdfs.mkdirs(dir);
+ hdfs.allowSnapshot(dir);
+ for (int i = 0; i < 5; i++) {
+ DFSTestUtil.createFile(hdfs, new Path(subDir, "" + i), 100, (short) 1,
+ 1024L);
+ }
+
+ // take snapshot
+ hdfs.createSnapshot(dir, "s0");
+
+ // delete the subdir
+ hdfs.delete(subDir, true);
+
+ // roll the edit log
+ NameNode ann = cluster.getNameNode(0);
+ ann.getRpcServer().rollEditLog();
+
+ hdfs.deleteSnapshot(dir, "s0");
+ // wait for the blocks deletion at namenode
+ Thread.sleep(2000);
+
+ NameNodeAdapter.abortEditLogs(ann);
+ cluster.restartNameNode(0, false);
+ cluster.transitionToActive(0);
+
+ // wait till the cluster becomes active
+ cluster.waitClusterUp();
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java Thu Nov 14 23:56:56 2013
@@ -22,7 +22,6 @@ import static org.apache.hadoop.fs.Commo
import static org.junit.Assert.*;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
@@ -40,6 +39,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
public class TestHftpDelegationToken {
@@ -71,9 +71,8 @@ public class TestHftpDelegationToken {
});
assertSame("wrong kind of file system", HftpFileSystem.class,
fs.getClass());
- Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken");
- renewToken.setAccessible(true);
- assertSame("wrong token", token, renewToken.get(fs));
+ assertSame("wrong token", token,
+ Whitebox.getInternalState(fs, "renewToken"));
}
@Test
@@ -81,7 +80,7 @@ public class TestHftpDelegationToken {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration();
- conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
+ conf.setClass("fs.hftp.impl", HftpFileSystem.class, FileSystem.class);
int httpPort = 80;
int httpsPort = 443;
@@ -90,21 +89,21 @@ public class TestHftpDelegationToken {
// test with implicit default port
URI fsUri = URI.create("hftp://localhost");
- MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ HftpFileSystem fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort, conf);
// test with explicit default port
// Make sure it uses the port from the hftp URI.
fsUri = URI.create("hftp://localhost:"+httpPort);
- fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort, conf);
// test with non-default port
// Make sure it uses the port from the hftp URI.
fsUri = URI.create("hftp://localhost:"+(httpPort+1));
- fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort + 1, conf);
@@ -116,7 +115,7 @@ public class TestHftpDelegationToken {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration();
- conf.setClass("fs.hsftp.impl", MyHsftpFileSystem.class, FileSystem.class);
+ conf.setClass("fs.hsftp.impl", HsftpFileSystem.class, FileSystem.class);
int httpPort = 80;
int httpsPort = 443;
@@ -125,19 +124,19 @@ public class TestHftpDelegationToken {
// test with implicit default port
URI fsUri = URI.create("hsftp://localhost");
- MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ HsftpFileSystem fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf);
// test with explicit default port
fsUri = URI.create("hsftp://localhost:"+httpsPort);
- fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf);
// test with non-default port
fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
- fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort+1, conf);
@@ -197,6 +196,9 @@ public class TestHftpDelegationToken {
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
+ @SuppressWarnings("unchecked")
+ TokenAspect<HftpFileSystem> aspect = (TokenAspect<HftpFileSystem>) Whitebox.getInternalState(fs, "tokenAspect");
+
// use ip-based tokens
SecurityUtilTestHelper.setTokenServiceUseIp(true);
@@ -208,7 +210,7 @@ public class TestHftpDelegationToken {
ugi.addToken(hdfsToken);
// test fallback to hdfs token
- Token<?> token = fs.selectDelegationToken(ugi);
+ Token<?> token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hdfsToken, token);
@@ -217,13 +219,13 @@ public class TestHftpDelegationToken {
new byte[0], new byte[0],
HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
ugi.addToken(hftpToken);
- token = fs.selectDelegationToken(ugi);
+ token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hftpToken, token);
// switch to using host-based tokens, no token should match
SecurityUtilTestHelper.setTokenServiceUseIp(false);
- token = fs.selectDelegationToken(ugi);
+ token = aspect.selectDelegationToken(ugi);
assertNull(token);
// test fallback to hdfs token
@@ -232,7 +234,7 @@ public class TestHftpDelegationToken {
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("localhost:8020"));
ugi.addToken(hdfsToken);
- token = fs.selectDelegationToken(ugi);
+ token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hdfsToken, token);
@@ -241,36 +243,8 @@ public class TestHftpDelegationToken {
new byte[0], new byte[0],
HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port));
ugi.addToken(hftpToken);
- token = fs.selectDelegationToken(ugi);
+ token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hftpToken, token);
}
-
- static class MyHftpFileSystem extends HftpFileSystem {
- @Override
- public URI getCanonicalUri() {
- return super.getCanonicalUri();
- }
- @Override
- public int getDefaultPort() {
- return super.getDefaultPort();
- }
- // don't automatically get a token
- @Override
- protected void initDelegationToken() throws IOException {}
- }
-
- static class MyHsftpFileSystem extends HsftpFileSystem {
- @Override
- public URI getCanonicalUri() {
- return super.getCanonicalUri();
- }
- @Override
- public int getDefaultPort() {
- return super.getDefaultPort();
- }
- // don't automatically get a token
- @Override
- protected void initDelegationToken() throws IOException {}
- }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java Thu Nov 14 23:56:56 2013
@@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
@@ -37,10 +39,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -59,6 +63,7 @@ import org.jboss.netty.channel.socket.ni
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -78,9 +83,10 @@ public class TestDelegationTokenRemoteFe
private static final String EXP_DATE = "124123512361236";
private static final String tokenFile = "http.file.dta";
+ private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
private int httpPort;
- private String serviceUrl;
+ private URI serviceUrl;
private FileSystem fileSys;
private Configuration conf;
private ServerBootstrap bootstrap;
@@ -92,7 +98,7 @@ public class TestDelegationTokenRemoteFe
conf = new Configuration();
fileSys = FileSystem.getLocal(conf);
httpPort = NetUtils.getFreeSocketPort();
- serviceUrl = "http://localhost:" + httpPort;
+ serviceUrl = new URI("http://localhost:" + httpPort);
testToken = createToken(serviceUrl);
}
@@ -121,9 +127,9 @@ public class TestDelegationTokenRemoteFe
* try to fetch token without http server with IOException
*/
@Test
- public void testTokenRenewFail() {
+ public void testTokenRenewFail() throws AuthenticationException {
try {
- DelegationTokenFetcher.renewDelegationToken(serviceUrl, testToken);
+ DelegationTokenFetcher.renewDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
} catch (IOException ex) {
}
@@ -133,9 +139,9 @@ public class TestDelegationTokenRemoteFe
* try cancel token without http server with IOException
*/
@Test
- public void expectedTokenCancelFail() {
+ public void expectedTokenCancelFail() throws AuthenticationException {
try {
- DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
+ DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
} catch (IOException ex) {
}
@@ -145,11 +151,12 @@ public class TestDelegationTokenRemoteFe
* try fetch token and get http response with error
*/
@Test
- public void expectedTokenRenewErrorHttpResponse() {
+ public void expectedTokenRenewErrorHttpResponse()
+ throws AuthenticationException, URISyntaxException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
try {
- DelegationTokenFetcher.renewDelegationToken(serviceUrl + "/exception",
- createToken(serviceUrl));
+ DelegationTokenFetcher.renewDelegationToken(connectionFactory, new URI(
+ serviceUrl.toString() + "/exception"), createToken(serviceUrl));
fail("Token fetcher shouldn't be able to renew tokens using an invalid"
+ " NN URL");
} catch (IOException ex) {
@@ -159,13 +166,14 @@ public class TestDelegationTokenRemoteFe
}
/**
- *
*
*/
@Test
- public void testCancelTokenFromHttp() throws IOException {
+ public void testCancelTokenFromHttp() throws IOException,
+ AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
- DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
+ DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl,
+ testToken);
if (assertionError != null)
throw assertionError;
}
@@ -174,11 +182,12 @@ public class TestDelegationTokenRemoteFe
* Call renew token using http server return new expiration time
*/
@Test
- public void testRenewTokenFromHttp() throws IOException {
+ public void testRenewTokenFromHttp() throws IOException,
+ NumberFormatException, AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
assertTrue("testRenewTokenFromHttp error",
Long.valueOf(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken(
- serviceUrl, testToken));
+ connectionFactory, serviceUrl, testToken));
if (assertionError != null)
throw assertionError;
}
@@ -204,11 +213,11 @@ public class TestDelegationTokenRemoteFe
throw assertionError;
}
- private static Token<DelegationTokenIdentifier> createToken(String serviceUri) {
+ private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) {
byte[] pw = "hadoop".getBytes();
byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
"renewer"), new Text("realuser")).getBytes();
- Text service = new Text(serviceUri);
+ Text service = new Text(serviceUri.toString());
return new Token<DelegationTokenIdentifier>(ident, pw,
HftpFileSystem.TOKEN_KIND, service);
}
@@ -301,8 +310,15 @@ public class TestDelegationTokenRemoteFe
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- return;
+
+ if (request.getMethod() == HttpMethod.OPTIONS) {
+ // Mimic SPNEGO authentication
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
+ HttpResponseStatus.OK);
+ response.addHeader("Set-Cookie", "hadoop-auth=1234");
+ e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ } else if (request.getMethod() != GET) {
+ e.getChannel().close();
}
UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
.iterator();
@@ -338,7 +354,7 @@ public class TestDelegationTokenRemoteFe
}
private ServerBootstrap startHttpServer(int port,
- final Token<DelegationTokenIdentifier> token, final String url) {
+ final Token<DelegationTokenIdentifier> token, final URI url) {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
@@ -348,7 +364,7 @@ public class TestDelegationTokenRemoteFe
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new HttpRequestDecoder(),
new HttpChunkAggregator(65536), new HttpResponseEncoder(),
- new CredentialsLogicHandler(token, url));
+ new CredentialsLogicHandler(token, url.toString()));
}
});
bootstrap.bind(new InetSocketAddress("localhost", port));