You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/24 00:56:43 UTC
svn commit: r967297 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Author: ddas
Date: Fri Jul 23 22:56:43 2010
New Revision: 967297
URL: http://svn.apache.org/viewvc?rev=967297&view=rev
Log:
MAPREDUCE-1718. Fixes a bug in the construction of jobconf key for the mapping that the tasks use at runtime for looking up delegation tokens. Contributed by Boris Shkolnik.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=967297&r1=967296&r2=967297&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 23 22:56:43 2010
@@ -195,6 +195,10 @@ Trunk (unreleased changes)
MAPREDUCE-1925. Fix failing TestRumenJobTraces. (Ravi Gummadi via cdouglas)
+ MAPREDUCE-1718. Fixes a bug in the construction of jobconf key for the
+ mapping that the tasks use at runtime for looking up delegation tokens.
+ (Boris Shkolnik via ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=967297&r1=967296&r2=967297&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jul 23 22:56:43 2010
@@ -36,9 +36,9 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosName;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,81 +86,98 @@ public class TokenCache {
static void obtainTokensForNamenodesInternal(Credentials credentials,
Path[] ps, Configuration conf) throws IOException {
+ for(Path p: ps) {
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ obtainTokensForNamenodesInternal(fs, credentials, conf);
+ }
+ }
+
+ /**
+ * get delegation token for a specific FS
+ * @param fs
+ * @param credentials
+ * @param p
+ * @param conf
+ * @throws IOException
+ */
+ static void obtainTokensForNamenodesInternal(FileSystem fs,
+ Credentials credentials, Configuration conf) throws IOException {
+
// get jobtracker principal id (for the renewer)
- KerberosName jtKrbName = new KerberosName(conf.get(JTConfig.JT_USER_NAME,
- ""));
+ KerberosName jtKrbName =
+ new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
Text delegTokenRenewer = new Text(jtKrbName.getShortName());
+
boolean notReadFile = true;
- for(Path p: ps) {
- // TODO: Connecting to the namenode is not required in the case,
- // where we already have the credentials in the file
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- if(fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- URI uri = fs.getUri();
- String fs_addr = buildDTServiceName(uri);
-
- // see if we already have the token
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- if (notReadFile) { //read the file only once
- String binaryTokenFilename =
- conf.get("mapreduce.job.credentials.binary");
- if (binaryTokenFilename != null) {
- credentials.readTokenStorageFile(new Path("file:///" +
- binaryTokenFilename), conf);
- }
- notReadFile = false;
- token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
+
+ // TODO: Connecting to the namenode is not required in the case,
+ // where we already have the credentials in the file
+ if(fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ URI uri = dfs.getUri();
+ String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+
+ // see if we already have the token
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ return;
+ }
+ if (notReadFile) { //read the file only once
+ String binaryTokenFilename =
+ conf.get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ credentials.readTokenStorageFile(new Path("file:///" +
+ binaryTokenFilename), conf);
}
- // get the token
- token = dfs.getDelegationToken(delegTokenRenewer);
- if(token==null)
- throw new IOException("Token from " + fs_addr + " is null");
-
- token.setService(new Text(fs_addr));
- credentials.addToken(new Text(fs_addr), token);
- LOG.info("Got dt for " + p + ";uri="+ fs_addr +
- ";t.service="+token.getService());
- } else if (fs instanceof HftpFileSystem) {
- String fs_addr = buildDTServiceName(fs.getUri());
- Token<DelegationTokenIdentifier> token =
+ notReadFile = false;
+ token =
TokenCache.getDelegationToken(credentials, fs_addr);
if(token != null) {
LOG.debug("DT for " + token.getService() + " is already present");
- continue;
+ return;
}
- //the initialize method of hftp, called via FileSystem.get() done
- //earlier gets a delegation token
+ }
+ // get the token
+ token = dfs.getDelegationToken(delegTokenRenewer);
+ if(token==null)
+ throw new IOException("Token from " + fs_addr + " is null");
+
+ token.setService(new Text(fs_addr));
+ credentials.addToken(new Text(fs_addr), token);
+ LOG.info("Got dt for " + uri + ";uri="+ fs_addr +
+ ";t.service="+token.getService());
+ } else if (fs instanceof HftpFileSystem) {
+ HftpFileSystem hfs = (HftpFileSystem)fs;
+ URI uri = hfs.getUri();
+ String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ return;
+ }
+ //the initialize method of hftp, called via FileSystem.get() done
+ //earlier gets a delegation token
- Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
- if (t != null) {
- credentials.addToken(new Text(fs_addr), t);
-
- // in this case port in fs_addr is port for hftp request, but
- // token's port is for RPC
- // to find the correct DT we need to know the mapping between Hftp port
- // and RPC one. hence this new setting in the conf.
- URI uri = ((HftpFileSystem) fs).getUri();
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+buildDTServiceName(uri);
- conf.set(key, t.getService().toString());
- LOG.info("GOT dt for " + p + " and stored in conf as " + key + "="
- + t.getService());
+ Token<? extends TokenIdentifier> t = hfs.getDelegationToken();
+ if (t != null) {
+ credentials.addToken(new Text(fs_addr), t);
+
+ // in this case port in fs_addr is port for hftp request, but
+ // token's port is for RPC
+ // to find the correct DT we need to know the mapping between Hftp port
+ // and RPC one. hence this new setting in the conf.
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+fs_addr;
+ conf.set(key, t.getService().toString());
+ LOG.info("GOT dt for " + uri + " and stored in conf as " + key + "="
+ + t.getService());
- }
}
}
}
-
+
/**
* file name used on HDFS for generated job token
*/
@@ -225,15 +242,4 @@ public class TokenCache {
public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
-
- public static String buildDTServiceName(URI uri) {
- int port = uri.getPort();
- if(port == -1)
- port = NameNode.DEFAULT_PORT;
-
- // build the service name string "ip:port"
- StringBuffer sb = new StringBuffer();
- sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
- return sb.toString();
- }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=967297&r1=967296&r2=967297&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jul 23 22:56:43 2010
@@ -22,9 +22,12 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.HashMap;
@@ -37,8 +40,11 @@ import org.apache.commons.codec.binary.B
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -46,18 +52,20 @@ import org.apache.hadoop.mapred.MiniMRCl
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Assert;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestTokenCache {
private static final int NUM_OF_KEYS = 10;
@@ -79,7 +87,7 @@ public class TestTokenCache {
dts_size = dts.size();
- if(dts.size() != 2) { // one job token and one delegation token
+ if(dts_size != 2) { // one job token and one delegation token
throw new RuntimeException("tokens are not available"); // fail the test
}
@@ -270,7 +278,8 @@ public class TestTokenCache {
p2 }, jConf);
// this token is keyed by hostname:port key.
- String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
+ String fs_addr =
+ SecurityUtil.buildDTServiceName(p1.toUri(), NameNode.DEFAULT_PORT);
Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(
credentials, fs_addr);
System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt);
@@ -289,4 +298,63 @@ public class TestTokenCache {
assertTrue("didn't find token for " + p1 ,found);
}
}
+
+ @Test
+ public void testGetTokensForHftpFS() throws IOException, URISyntaxException {
+ HftpFileSystem hfs = mock(HftpFileSystem.class);
+
+ DelegationTokenSecretManager dtSecretManager =
+ dfsCluster.getNamesystem().getDelegationTokenSecretManager();
+ DelegationTokenIdentifier dtId =
+ new DelegationTokenIdentifier(new Text("user"), new Text("renewer"), null);
+ final Token<DelegationTokenIdentifier> t =
+ new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
+
+ final URI uri = new URI("hftp://host:2222/file1");
+ String fs_addr =
+ SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+ t.setService(new Text(fs_addr));
+
+ //when(hfs.getUri()).thenReturn(uri);
+ Mockito.doAnswer(new Answer<URI>(){
+ @Override
+ public URI answer(InvocationOnMock invocation)
+ throws Throwable {
+ return uri;
+ }}).when(hfs).getUri();
+
+ //when(hfs.getDelegationToken()).thenReturn((Token<? extends TokenIdentifier>) t);
+ Mockito.doAnswer(new Answer<Token<DelegationTokenIdentifier>>(){
+ @Override
+ public Token<DelegationTokenIdentifier> answer(InvocationOnMock invocation)
+ throws Throwable {
+ return t;
+ }}).when(hfs).getDelegationToken();
+
+
+ Credentials credentials = new Credentials();
+ Path p = new Path(uri.toString());
+ System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr);
+ TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf);
+
+ Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
+ assertEquals("number of tokens is not 1", 1, tns.size());
+
+ boolean found = false;
+ for(Token<? extends TokenIdentifier> tt: tns) {
+ System.out.println("token="+tt);
+ if(tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
+ tt.getService().equals(new Text(fs_addr))) {
+ found = true;
+ assertEquals("different token", tt, t);
+ }
+ assertTrue("didn't find token for " + p, found);
+ }
+ // also check the conf value
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + fs_addr;
+ String confKey = jConf.get(key);
+ assertEquals("jconf key for HFTP DT is not correct", confKey,
+ t.getService().toString());
+ }
+
}