You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/08/04 01:46:51 UTC
svn commit: r982087 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/security/
src/test/mapred/org/apache/hadoop/mapreduce/security/
src/tools/org/apache/hadoop/tools/
Author: ddas
Date: Tue Aug 3 23:46:51 2010
New Revision: 982087
URL: http://svn.apache.org/viewvc?rev=982087&view=rev
Log:
MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873. Contributed by Boris Shkolnik & Owen O'Malley.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 3 23:46:51 2010
@@ -223,6 +223,9 @@ Trunk (unreleased changes)
of configuration properties "mapreduce.job.name" and "mapred.job.name".
(Ravi Gummadi via amareshwari)
+ MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873.
+ (Boris Shkolnik & Owen O'Malley via ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Tue Aug 3 23:46:51 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.proto
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -340,12 +342,7 @@ class JobSubmitter {
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
- // load the binary file, if the user has one
- String binaryTokenFilename = conf.get("mapreduce.job.credentials.binary");
- if (binaryTokenFilename != null) {
- job.getCredentials().readTokenStorageFile(
- new Path("file:///" + binaryTokenFilename), conf);
- }
+ populateTokenCache(conf, job.getCredentials());
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
@@ -364,7 +361,7 @@ class JobSubmitter {
//
// Now, actually submit the job (using the submit name)
//
- populateTokenCache(conf, job.getCredentials());
+ printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
@@ -411,6 +408,21 @@ class JobSubmitter {
}
+
+ @SuppressWarnings("unchecked")
+ private void printTokens(JobID jobId,
+ Credentials credentials) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Printing tokens for job: " + jobId);
+ for(Token<?> token: credentials.getAllTokens()) {
+ if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
+ LOG.debug("Submitting with " +
+ DFSClient.stringifyToken((Token<org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier>) token));
+ }
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
@@ -494,11 +506,18 @@ class JobSubmitter {
}
}
- // get secret keys and tokens and store them into TokenCache
@SuppressWarnings("unchecked")
- private void populateTokenCache(Configuration conf, Credentials credentials)
- throws IOException {
- // create TokenStorage object with user secretKeys
+ private void readTokensFromFiles(Configuration conf, Credentials credentials)
+ throws IOException {
+ // add tokens and secrets coming from a token storage file
+ String binaryTokenFilename =
+ conf.get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ Credentials binary = Credentials.readTokenStorageFile(
+ new Path("file:///" + binaryTokenFilename), conf);
+ credentials.addAll(binary);
+ }
+ // add secret keys coming from a json file
String tokensFileName = conf.get("mapreduce.job.credentials.json");
if(tokensFileName != null) {
LOG.info("loading user's secret keys from " + tokensFileName);
@@ -523,10 +542,17 @@ class JobSubmitter {
if(json_error)
LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
}
-
+ }
+
+ //get secret keys and tokens and store them into TokenCache
+ @SuppressWarnings("unchecked")
+ private void populateTokenCache(Configuration conf, Credentials credentials)
+ throws IOException{
+ readTokensFromFiles(conf, credentials);
// add the delegation tokens from configuration
String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
- LOG.info("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+ LOG.debug("adding the following namenodes' delegation tokens:" +
+ Arrays.toString(nameNodes));
if(nameNodes != null) {
Path [] ps = new Path[nameNodes.length];
for(int i=0; i< nameNodes.length; i++) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Tue Aug 3 23:46:51 2010
@@ -106,74 +106,40 @@ public class TokenCache {
// get jobtracker principal id (for the renewer)
KerberosName jtKrbName =
new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
- Text delegTokenRenewer = new Text(jtKrbName.getShortName());
-
- boolean notReadFile = true;
-
- // TODO: Connecting to the namenode is not required in the case,
- // where we already have the credentials in the file
- if(fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- URI uri = dfs.getUri();
- String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+
+ String delegTokenRenewer = jtKrbName.getShortName();
+ boolean readFile = true;
- // see if we already have the token
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- return;
- }
- if (notReadFile) { //read the file only once
+ String fsName = fs.getCanonicalServiceName();
+ if (TokenCache.getDelegationToken(credentials, fsName) == null) {
+ //TODO: Need to come up with a better place to put
+ //this block of code to do with reading the file
+ if (readFile) {
+ readFile = false;
String binaryTokenFilename =
conf.get("mapreduce.job.credentials.binary");
if (binaryTokenFilename != null) {
- credentials.readTokenStorageFile(new Path("file:///" +
- binaryTokenFilename), conf);
+ Credentials binary;
+ try {
+ binary = Credentials.readTokenStorageFile(
+ new Path("file:///" + binaryTokenFilename), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ credentials.addAll(binary);
}
- notReadFile = false;
- token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
+ if (TokenCache.getDelegationToken(credentials, fsName) != null) {
+ LOG.debug("DT for " + fsName + " is already present");
return;
}
}
- // get the token
- token = dfs.getDelegationToken(delegTokenRenewer);
- if(token==null)
- throw new IOException("Token from " + fs_addr + " is null");
-
- token.setService(new Text(fs_addr));
- credentials.addToken(new Text(fs_addr), token);
- LOG.info("Got dt for " + uri + ";uri="+ fs_addr +
- ";t.service="+token.getService());
- } else if (fs instanceof HftpFileSystem) {
- HftpFileSystem hfs = (HftpFileSystem)fs;
- URI uri = hfs.getUri();
- String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- return;
- }
- //the initialize method of hftp, called via FileSystem.get() done
- //earlier gets a delegation token
-
- Token<? extends TokenIdentifier> t = hfs.getDelegationToken();
- if (t != null) {
- credentials.addToken(new Text(fs_addr), t);
-
- // in this case port in fs_addr is port for hftp request, but
- // token's port is for RPC
- // to find the correct DT we need to know the mapping between Hftp port
- // and RPC one. hence this new setting in the conf.
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+fs_addr;
- conf.set(key, t.getService().toString());
- LOG.info("GOT dt for " + uri + " and stored in conf as " + key + "="
- + t.getService());
-
+ Token<?> token = fs.getDelegationToken(delegTokenRenewer);
+ if (token != null) {
+ Text fsNameText = new Text(fsName);
+ token.setService(fsNameText);
+ credentials.addToken(fsNameText, token);
+ LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName +
+ ";t.service="+token.getService());
}
}
}
@@ -213,14 +179,14 @@ public class TokenCache {
public static Credentials loadTokens(String jobTokenFile, JobConf conf)
throws IOException {
Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
-
- Credentials ts = new Credentials();
- ts.readTokenStorageFile(localJobTokenFile, conf);
+
+ Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
if(LOG.isDebugEnabled()) {
- LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath()
- +"; num of sec keys = " + ts.numberOfSecretKeys() + " Number of tokens " +
- ts.numberOfTokens());
+ LOG.debug("Task: Loaded jobTokenFile from: "+
+ localJobTokenFile.toUri().getPath()
+ +"; num of sec keys = " + ts.numberOfSecretKeys() +
+ " Number of tokens " + ts.numberOfTokens());
}
return ts;
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Tue Aug 3 23:46:51 2010
@@ -271,7 +271,6 @@ public class TestTokenCache {
@Test
public void testGetTokensForNamenodes() throws IOException {
- FileSystem fs = dfsCluster.getFileSystem();
Credentials credentials = new Credentials();
TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1,
@@ -305,13 +304,15 @@ public class TestTokenCache {
DelegationTokenSecretManager dtSecretManager =
dfsCluster.getNamesystem().getDelegationTokenSecretManager();
+ String renewer = "renewer";
+ jConf.set(JTConfig.JT_USER_NAME,renewer);
DelegationTokenIdentifier dtId =
- new DelegationTokenIdentifier(new Text("user"), new Text("renewer"), null);
+ new DelegationTokenIdentifier(new Text("user"), new Text(renewer), null);
final Token<DelegationTokenIdentifier> t =
new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
final URI uri = new URI("hftp://host:2222/file1");
- String fs_addr =
+ final String fs_addr =
SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
t.setService(new Text(fs_addr));
@@ -329,12 +330,19 @@ public class TestTokenCache {
public Token<DelegationTokenIdentifier> answer(InvocationOnMock invocation)
throws Throwable {
return t;
- }}).when(hfs).getDelegationToken();
-
-
+ }}).when(hfs).getDelegationToken(renewer);
+
+ //when(hfs.getCanonicalServiceName).thenReturn(fs_addr);
+ Mockito.doAnswer(new Answer<String>(){
+ @Override
+ public String answer(InvocationOnMock invocation)
+ throws Throwable {
+ return fs_addr;
+ }}).when(hfs).getCanonicalServiceName();
+
Credentials credentials = new Credentials();
Path p = new Path(uri.toString());
- System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr);
+ System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr + "; rn=" + renewer);
TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf);
Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
@@ -350,11 +358,6 @@ public class TestTokenCache {
}
assertTrue("didn't find token for " + p, found);
}
- // also check the conf value
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + fs_addr;
- String confKey = jConf.get(key);
- assertEquals("jconf key for HFTP DT is not correct", confKey,
- t.getService().toString());
}
}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Tue Aug 3 23:46:51 2010
@@ -740,10 +740,6 @@ public class DistCp implements Tool {
List<IOException> rslt = new ArrayList<IOException>();
List<Path> unglobbed = new LinkedList<Path>();
- // get tokens for all the required FileSystems..
- // also set the renewer as the JobTracker for the hftp case
- jobConf.set(HftpFileSystem.HFTP_RENEWER,
- jobConf.get(JobTracker.JT_USER_NAME, ""));
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);