You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:45:20 UTC
svn commit: r1077141 - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/mapred/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/
src/mapred/org/apache/hadoop/mapreduce/lib/input/ sr...
Author: omalley
Date: Fri Mar 4 03:45:19 2011
New Revision: 1077141
URL: http://svn.apache.org/viewvc?rev=1077141&view=rev
Log:
commit 5dd9e6a03d8f026c4ed4af85013bff0ca2cc34a5
Author: Boris Shkolnik <bo...@yahoo-inc.com>
Date: Mon Feb 1 22:52:13 2010 -0800
MAPREDUCE:1383 from https://issues.apache.org/jira/secure/attachment/12434455/MAPREDUCE-1383-BP20-7.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1383. Automates fetching of delegation tokens in File*Formats
+ Distributed Cache and Distcp. Also, provides a config mapreduce.job.hdfs-servers
+ that the jobs can populate with a comma separated list of namenodes.
+ The job client automatically fetches delegation tokens from those namenodes.
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestTokenCache.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/build.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/common/branches/branch-0.20-security-patches/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/build.xml?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/build.xml Fri Mar 4 03:45:19 2011
@@ -367,7 +367,7 @@
</target>
- <target name="compile-mapred-classes" depends="compile-core-classes">
+ <target name="compile-mapred-classes" depends="compile-core-classes,compile-hdfs-classes">
<jsp-compile
uriroot="${src.webapps}/task"
outputdir="${build.src}"
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar 4 03:45:19 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.mapreduce.security.TokenCache;
/**
* Manages a single machine's instance of a cross-job
@@ -725,4 +726,32 @@ public class TrackerDistributedCacheMana
static void setFileVisibilities(Configuration conf, String booleans) {
conf.set(JobContext.CACHE_FILE_VISIBILITIES, booleans);
}
+
+ /**
+ * For each archive or cache file - get the corresponding delegation token
+ * @param job
+ * @throws IOException
+ */
+ public static void getDelegationTokens(Configuration job) throws IOException {
+ URI[] tarchives = DistributedCache.getCacheArchives(job);
+ URI[] tfiles = DistributedCache.getCacheFiles(job);
+
+ int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
+ Path[] ps = new Path[size];
+
+ int i = 0;
+ if (tarchives != null) {
+ for (i=0; i < tarchives.length; i++) {
+ ps[i] = new Path(tarchives[i].toString());
+ }
+ }
+
+ if (tfiles != null) {
+ for(int j=0; j< tfiles.length; j++) {
+ ps[i+j] = new Path(tfiles[j].toString());
+ }
+ }
+
+ TokenCache.obtainTokensForNamenodes(ps, job);
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar 4 03:45:19 2011
@@ -26,25 +26,20 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
-import org.apache.log4j.LogManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
/**
* The main() for child processes.
@@ -72,8 +67,7 @@ class Child {
// file name is passed thru env
String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
- defaultConf.set(JobContext.JOB_TOKEN_FILE, jobTokenFile);
- TokenStorage ts = TokenCache.loadTaskTokenStorage(defaultConf);
+ TokenStorage ts = TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() +
"; from file=" + jobTokenFile);
@@ -155,7 +149,7 @@ class Child {
JobConf job = new JobConf(task.getJobFile());
// set job shuffle token
- Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
+ Token<? extends TokenIdentifier> jt = ts.getJobToken();
// set the jobTokenFile into task
task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Fri Mar 4 03:45:19 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
@@ -152,6 +153,9 @@ public abstract class FileInputFormat<K,
throw new IOException("No input paths specified in job");
}
+ // get tokens for all the required FileSystems..
+ TokenCache.obtainTokensForNamenodes(dirs, job);
+
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Mar 4 03:45:19 2011
@@ -24,6 +24,7 @@ import java.text.NumberFormat;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.Progressable;
/** A base class for {@link OutputFormat}. */
@@ -106,6 +107,10 @@ public abstract class FileOutputFormat<K
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
+
+ // get delegation token for the outDir's file system
+ TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+
// check its existence
if (fs.exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 03:45:19 2011
@@ -64,7 +64,7 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -72,6 +72,8 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
/**
@@ -639,6 +641,8 @@ public class JobClient extends Configure
TrackerDistributedCacheManager.determineTimestamps(job);
// set the public/private visibility of the archives and files
TrackerDistributedCacheManager.determineCacheVisibilities(job);
+ // get DelegationTokens for cache files
+ TrackerDistributedCacheManager.getDelegationTokens(job);
String originalJarPath = job.getJar();
@@ -726,7 +730,12 @@ public class JobClient extends Configure
job.set("mapreduce.job.dir", submitJobDir.toString());
JobStatus status = null;
try {
+
copyAndConfigureFiles(job, submitJobDir);
+
+ // get delegation token for the dir
+ TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, job);
+
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
int reduces = job.getNumReduceTasks();
JobContext context = new JobContext(job, jobId);
@@ -756,29 +765,13 @@ public class JobClient extends Configure
out.close();
}
- // create TokenStorage object with user secretKeys
- String tokensFileName = job.get("tokenCacheFile");
- TokenStorage tokenStorage = null;
- if(tokensFileName != null) {
- LOG.info("loading secret keys from " + tokensFileName);
- String localFileName = new Path(tokensFileName).toUri().getPath();
- tokenStorage = new TokenStorage();
- // read JSON
- ObjectMapper mapper = new ObjectMapper();
- Map<String, String> nm =
- mapper.readValue(new File(localFileName), Map.class);
-
- for(Map.Entry<String, String> ent: nm.entrySet()) {
- LOG.debug("adding secret key alias="+ent.getKey());
- tokenStorage.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
- }
- }
//
// Now, actually submit the job (using the submit name)
//
+ populateTokenCache(job);
status = jobSubmitClient.submitJob(
- jobId, submitJobDir.toString(), tokenStorage);
+ jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
if (status != null) {
return new NetworkedJob(status);
} else {
@@ -1789,5 +1782,45 @@ public class JobClient extends Configure
int res = ToolRunner.run(new JobClient(), argv);
System.exit(res);
}
+
+ //get secret keys and tokens and store them into TokenCache
+ @SuppressWarnings("unchecked")
+ private void populateTokenCache(Configuration conf) throws IOException{
+ // create TokenStorage object with user secretKeys
+ String tokensFileName = conf.get("tokenCacheFile");
+ if(tokensFileName != null) {
+ LOG.info("loading user's secret keys from " + tokensFileName);
+ String localFileName = new Path(tokensFileName).toUri().getPath();
+
+ boolean json_error = false;
+ try {
+ // read JSON
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, String> nm =
+ mapper.readValue(new File(localFileName), Map.class);
+
+ for(Map.Entry<String, String> ent: nm.entrySet()) {
+ TokenCache.setSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+ }
+ } catch (JsonMappingException e) {
+ json_error = true;
+ } catch (JsonParseException e) {
+ json_error = true;
+ }
+ if(json_error)
+ LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
+ }
+
+ // add the delegation tokens from configuration
+ String [] nameNodes = conf.getStrings(JobContext.JOB_NAMENODES);
+ LOG.info("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+ if(nameNodes != null) {
+ Path [] ps = new Path[nameNodes.length];
+ for(int i=0; i< nameNodes.length; i++) {
+ ps[i] = new Path(nameNodes[i]);
+ }
+ TokenCache.obtainTokensForNamenodes(ps, conf);
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:45:19 2011
@@ -43,7 +43,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.TokenStorage;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.metrics.MetricsContext;
@@ -3090,7 +3090,7 @@ class JobInProgress {
*/
private void generateAndStoreTokens() throws IOException {
Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
- Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+ Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
// we need to create this file using the jobtracker's filesystem
FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
//create JobToken file and write token to it
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:45:19 2011
@@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
@@ -497,7 +498,7 @@ abstract class TaskRunner extends Thread
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
- String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+ String jobTokenFile = conf.get(TokenCache.JOB_TOKEN_FILENAME);
LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
env.put("JOB_TOKEN_FILE", jobTokenFile);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:45:19 2011
@@ -927,6 +927,7 @@ public class TaskTracker
new LocalDirAllocator("mapred.local.dir");
// intialize the job directory
+ @SuppressWarnings("unchecked")
private void localizeJob(TaskInProgress tip)
throws IOException, InterruptedException {
Path localJarFile = null;
@@ -955,7 +956,8 @@ public class TaskTracker
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
- TokenStorage ts = TokenCache.loadTokens(rjob.jobConf);
+ TokenStorage ts = TokenCache.loadTokens(rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
+
Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -3707,7 +3709,7 @@ public class TaskTracker
throws IOException {
// check if the tokenJob file is there..
Path skPath = new Path(systemDirectory,
- jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
+ jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
FileStatus status = null;
long jobTokenSize = -1;
@@ -3724,7 +3726,7 @@ public class TaskTracker
// Download job_token
systemFS.copyToLocalFile(skPath, localJobTokenFile);
// set it into jobConf to transfer the name to TaskRunner
- jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
+ jobConf.set(TokenCache.JOB_TOKEN_FILENAME, localJobTokenFile.toString());
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar 4 03:45:19 2011
@@ -48,7 +48,8 @@ public class JobContext {
protected final org.apache.hadoop.mapred.JobConf conf;
private final JobID jobId;
- public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
+ public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
+ public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
public static final String CACHE_FILE_VISIBILITIES =
"mapreduce.job.cache.files.visibilities";
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Mar 4 03:45:19 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -185,6 +186,9 @@ public abstract class FileInputFormat<K,
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
+
+ // get tokens for all the required FileSystems..
+ TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
List<IOException> errors = new ArrayList<IOException>();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Mar 4 03:45:19 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.security.TokenCache;
/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
@@ -119,6 +120,10 @@ public abstract class FileOutputFormat<K
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
+
+ // get delegation token for outDir's file system
+ TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
+
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Fri Mar 4 03:45:19 2011
@@ -38,10 +38,6 @@ import org.apache.hadoop.record.Utils;
public class SecureShuffleUtils {
public static final String HTTP_HEADER_URL_HASH = "UrlHash";
public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
- /**
- * file name used on HDFS for generated job token
- */
- public static final String JOB_TOKEN_FILENAME = "jobToken";
/**
* Base64 encoded hash of msg
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar 4 03:45:19 2011
@@ -19,16 +19,23 @@
package org.apache.hadoop.mapreduce.security;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -40,6 +47,16 @@ import org.apache.hadoop.security.token.
public class TokenCache {
private static final Log LOG = LogFactory.getLog(TokenCache.class);
+ /**
+ * file name used on HDFS for generated job token
+ */
+ public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+ /**
+ * conf setting for job tokens cache file name
+ */
+
+ public static final String JOB_TOKEN_FILENAME = "mapreduce.job.jobTokenFile";
private static TokenStorage tokenStorage;
@@ -72,6 +89,16 @@ public class TokenCache {
}
/**
+ *
+ * @param namenode
+ * @return delegation token
+ */
+ @SuppressWarnings("unchecked")
+ public static Token<DelegationTokenIdentifier> getDelegationToken(String namenode) {
+ return (Token<DelegationTokenIdentifier>)getTokenStorage().getToken(new Text(namenode));
+ }
+
+ /**
* auxiliary method
* @return all the available tokens
*/
@@ -109,12 +136,12 @@ public class TokenCache {
* @throws IOException
*/
//@InterfaceAudience.Private
- public static TokenStorage loadTaskTokenStorage(JobConf conf)
+ public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
throws IOException {
if(tokenStorage != null)
return tokenStorage;
- tokenStorage = loadTokens(conf);
+ tokenStorage = loadTokens(fileName, conf);
return tokenStorage;
}
@@ -125,9 +152,8 @@ public class TokenCache {
* @throws IOException
*/
//@InterfaceAudience.Private
- public static TokenStorage loadTokens(JobConf conf)
+ public static TokenStorage loadTokens(String jobTokenFile, JobConf conf)
throws IOException {
- String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
Path localJobTokenFile = new Path (jobTokenFile);
FileSystem localFS = FileSystem.getLocal(conf);
FSDataInputStream in = localFS.open(localJobTokenFile);
@@ -135,9 +161,62 @@ public class TokenCache {
TokenStorage ts = new TokenStorage();
ts.readFields(in);
- LOG.info("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath()
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath()
+"; num of sec keys = " + ts.numberOfSecretKeys());
+ }
in.close();
return ts;
}
+
+ static String buildDTServiceName(URI uri) {
+ int port = uri.getPort();
+ if(port == -1)
+ port = NameNode.DEFAULT_PORT;
+
+ // build the service name string "/ip:port"
+ // for whatever reason using NetUtils.createSocketAddr(target).toString()
+ // returns "localhost/ip:port"
+ StringBuffer sb = new StringBuffer();
+ sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
+ return sb.toString();
+ }
+
+ /**
+ * get Delegation for each distinct dfs for given paths.
+ * @param ps
+ * @param conf
+ * @throws IOException
+ */
+ public static void obtainTokensForNamenodes(Path [] ps, Configuration conf)
+ throws IOException {
+ // get jobtracker principal id (for the renewer)
+ Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
+
+ for(Path p: ps) {
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ if(fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ URI uri = fs.getUri();
+ String fs_addr = buildDTServiceName(uri);
+
+ // see if we already have the token
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ continue;
+ }
+ // get the token
+ token = dfs.getDelegationToken(jtCreds);
+ if(token==null)
+ throw new IOException("Token from " + fs_addr + " is null");
+
+ token.setService(new Text(fs_addr));
+ TokenCache.addDelegationToken(fs_addr, token);
+ LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
+ ";t.service="+token.getService());
+ }
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java Fri Mar 4 03:45:19 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.security.token.
//@InterfaceAudience.Private
public class TokenStorage implements Writable {
- private static final Text SHUFFLE_JOB_TOKEN = new Text("ShuffleJobToken");
+ private static final Text JOB_TOKEN = new Text("ShuffleJobToken");
private Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
private Map<Text, Token<? extends TokenIdentifier>> tokenMap =
@@ -74,7 +74,7 @@ public class TokenStorage implements Wri
*/
//@InterfaceAudience.Private
public void setJobToken(Token<? extends TokenIdentifier> t) {
- setToken(SHUFFLE_JOB_TOKEN, t);
+ setToken(JOB_TOKEN, t);
}
/**
@@ -83,7 +83,7 @@ public class TokenStorage implements Wri
*/
//@InterfaceAudience.Private
public Token<? extends TokenIdentifier> getJobToken() {
- return getToken(SHUFFLE_JOB_TOKEN);
+ return getToken(JOB_TOKEN);
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:45:19 2011
@@ -21,31 +21,28 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
+import junit.framework.TestCase;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
-
-import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
/**
* Test to verify localization of a job and localization of a task on a
@@ -207,7 +204,7 @@ public class TestTaskTrackerLocalization
if(!dir.exists())
assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
- File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+ File jobTokenFile = new File(dir, TokenCache.JOB_TOKEN_HDFS_FILE);
FileOutputStream fos = new FileOutputStream(jobTokenFile);
java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077141&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Mar 4 03:45:19 2011
@@ -0,0 +1,284 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+
+public class TestTokenCache {
+ private static final int NUM_OF_KEYS = 10;
+
+ // my sleep class - adds check for tokenCache
+ static class MySleepJob extends SleepJob {
+ /**
+ * attempts to access tokenCache as from client
+ */
+ @Override
+ public void map(IntWritable key, IntWritable value,
+ OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+ throws IOException {
+ // get token storage and a key
+ TokenStorage ts = TokenCache.getTokenStorage();
+ byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
+ Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+ int dts_size = 0;
+ if(dts != null)
+ dts_size = dts.size();
+
+ System.out.println("inside MAP: ts==NULL?=" + (ts==null) +
+ "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) +
+ ";jobToken = " + (ts==null? "n/a":ts.getJobToken()) +
+ "; alias1 key=" + new String(key1) +
+ "; dts size= " + dts_size);
+
+ for(Token<? extends TokenIdentifier> t : dts) {
+ System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
+ }
+
+ if(dts.size() != 2) { // one job token and one delegation token
+ throw new RuntimeException("tokens are not available"); // fail the test
+ }
+
+ if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+ throw new RuntimeException("secret keys are not available"); // fail the test
+ }
+
+ super.map(key, value, output, reporter);
+ }
+
+ public JobConf setupJobConf(int numMapper, int numReducer,
+ long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount) {
+
+ JobConf job = super.setupJobConf(numMapper,numReducer,
+ mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount);
+
+ job.setMapperClass(MySleepJob.class);
+
+ return job;
+ }
+ }
+
+ private static MiniMRCluster mrCluster;
+ private static MiniDFSCluster dfsCluster;
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data","/tmp"), "sleepTest");
+ private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json");
+ private static int numSlaves = 1;
+ private static JobConf jConf;
+ private static ObjectMapper mapper = new ObjectMapper();
+
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+ jConf = new JobConf(conf);
+ mrCluster = new MiniMRCluster(0, 0, numSlaves,
+ dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
+ jConf);
+
+ createTokenFileJson();
+ verifySecretKeysInJSONFile();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if(mrCluster != null)
+ mrCluster.shutdown();
+ mrCluster = null;
+ if(dfsCluster != null)
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ // create jason file and put some keys into it..
+ private static void createTokenFileJson() throws IOException {
+ Map<String, String> map = new HashMap<String, String>();
+
+ try {
+ KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1");
+ for(int i=0; i<NUM_OF_KEYS; i++) {
+ SecretKeySpec key = (SecretKeySpec) kg.generateKey();
+ byte [] enc_key = key.getEncoded();
+ map.put("alias"+i, new String(Base64.encodeBase64(enc_key)));
+
+ }
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException(e);
+ }
+
+ System.out.println("writing secret keys into " + tokenFileName);
+ try {
+ File p = new File(tokenFileName.getParent().toString());
+ p.mkdirs();
+ // convert to JSON and save to the file
+ mapper.writeValue(new File(tokenFileName.toString()), map);
+
+ } catch (Exception e) {
+ System.out.println("failed with :" + e.getLocalizedMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void verifySecretKeysInJSONFile() throws IOException {
+ Map<String, String> map;
+ map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
+ assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
+
+ System.out.println("file " + tokenFileName + " verified; size="+ map.size());
+ }
+
+ /**
+ * run a distributed job and verify that TokenCache is available
+ * @throws IOException
+ */
+ @Test
+ public void testTokenCache() throws IOException {
+
+ System.out.println("running dist job");
+
+ // make sure JT starts
+ jConf = mrCluster.createJobConf();
+
+ // provide namenodes names for the job to get the delegation tokens for
+ //String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
+ NameNode nn = dfsCluster.getNameNode();
+ URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
+ jConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
+ // job tracker principle id..
+ jConf.set(JobContext.JOB_JOBTRACKER_ID, "jt_id");
+
+ // using argument to pass the file name
+ String[] args = {
+ "-tokenCacheFile", tokenFileName.toString(),
+ "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+ };
+
+ int res = -1;
+ try {
+ res = ToolRunner.run(jConf, new MySleepJob(), args);
+ } catch (Exception e) {
+ System.out.println("Job failed with" + e.getLocalizedMessage());
+ e.printStackTrace(System.out);
+ fail("Job failed");
+ }
+ assertEquals("dist job res is not 0", res, 0);
+ }
+
+ /**
+ * run a local job and verify that TokenCache is available
+ * @throws NoSuchAlgorithmException
+ * @throws IOException
+ */
+ @Test
+ public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
+
+ System.out.println("running local job");
+ // this is local job
+ String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"};
+ jConf.set("tokenCacheFile", tokenFileName.toString());
+
+ int res = -1;
+ try {
+ res = ToolRunner.run(jConf, new MySleepJob(), args);
+ } catch (Exception e) {
+ System.out.println("Job failed with" + e.getLocalizedMessage());
+ e.printStackTrace(System.out);
+ fail("local Job failed");
+ }
+ assertEquals("local job res is not 0", res, 0);
+ }
+
+ @Test
+ public void testGetTokensForNamenodes() throws IOException {
+ FileSystem fs = dfsCluster.getFileSystem();
+
+ Path p1 = new Path("file1");
+ Path p2 = new Path("file2");
+
+ p1 = fs.makeQualified(p1);
+ // do not qualify p2
+
+ TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
+
+ // this token is keyed by hostname:port key.
+ String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
+ Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
+ System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt);
+
+ assertNotNull("Token for nn is null", nnt);
+
+ // verify the size
+ Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+ assertEquals("number of tokens is not 1", 1, tns.size());
+
+ boolean found = false;
+ for(Token<? extends TokenIdentifier> t: tns) {
+ System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
+
+ if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
+ t.getService().equals(new Text(fs_addr))) {
+ found = true;
+ }
+ assertTrue("didn't find token for " + p1 ,found);
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java Fri Mar 4 03:45:19 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.mapred.RecordRe
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -623,6 +624,12 @@ public class DistCp implements Tool {
private static void checkSrcPath(Configuration conf, List<Path> srcPaths
) throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
+
+ // get tokens for all the required FileSystems..
+ Path[] ps = new Path[srcPaths.size()];
+ ps = srcPaths.toArray(ps);
+ TokenCache.obtainTokensForNamenodes(ps, conf);
+
for (Path p : srcPaths) {
FileSystem fs = p.getFileSystem(conf);
if (!fs.exists(p)) {
@@ -1018,6 +1025,10 @@ public class DistCp implements Tool {
jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
FileSystem dstfs = args.dst.getFileSystem(conf);
+
+ // get tokens for all the required FileSystems..
+ TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+
boolean dstExists = dstfs.exists(args.dst);
boolean dstIsDir = false;
if (dstExists) {