You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/09 22:27:13 UTC
svn commit: r962682 - in /hadoop/mapreduce/trunk: ./
src/contrib/mumak/src/java/org/apache/hadoop/mapred/
src/contrib/mumak/src/test/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/...
Author: ddas
Date: Fri Jul 9 20:27:12 2010
New Revision: 962682
URL: http://svn.apache.org/viewvc?rev=962682&view=rev
Log:
MAPREDUCE-1528. Incorporates the changes to the credentials API done in HADOOP-6845. Also, introduces Credentials in JobConf, and in JobContext. Contributed by Jitendra Pandey and Arun Murthy.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 9 20:27:12 2010
@@ -155,6 +155,10 @@ Trunk (unreleased changes)
MAPREDUCE-1820. Fix InputSampler to clone sampled keys. (Alex Kozlov via
cdouglas)
+ MAPREDUCE-1528. Incorporates the changes to the credentials API done in
+ HADOOP-6845. Also, introduces Credentials in JobConf, and in JobContext.
+ (Jitendra Pandey and Arun Murthy via ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Fri Jul 9 20:27:12 2010
@@ -35,7 +35,7 @@ import org.apache.hadoop.tools.rumen.Job
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.mapred.SimulatorJobInProgress;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
/**
* {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the
@@ -175,7 +175,7 @@ public class SimulatorJobTracker extends
@Override
public synchronized JobStatus submitJob(
- JobID jobId, String jobSubmitDir, TokenStorage ts)
+ JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException {
boolean loggingEnabled = LOG.isDebugEnabled();
if (loggingEnabled) {
Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Fri Jul 9 20:27:12 2010
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.JobPr
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskReport;
@@ -82,7 +82,7 @@ public class MockSimulatorJobTracker imp
@Override
public JobStatus submitJob(
- JobID jobId, String jobSubmitDir, TokenStorage ts) throws IOException {
+ JobID jobId, String jobSubmitDir, Credentials ts) throws IOException {
JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f,
JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", "");
return status;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Fri Jul 9 20:27:12 2010
@@ -34,7 +34,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -78,11 +78,11 @@ class Child {
//load token cache storage
String jobTokenFile =
System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- TokenStorage ts =
- TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
- LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() +
+ Credentials credentials =
+ TokenCache.loadTokens(jobTokenFile, defaultConf);
+ LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
"; from file=" + jobTokenFile);
- Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
jt.setService(new Text(address.getAddress().getHostAddress() + ":"
+ address.getPort()));
UserGroupInformation current = UserGroupInformation.getCurrentUser();
@@ -93,6 +93,9 @@ class Child {
= UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
taskOwner.addToken(jt);
+ // Set the credentials
+ defaultConf.setCredentials(credentials);
+
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
@Override
@@ -177,7 +180,10 @@ class Child {
//create the index file so that the log files
//are viewable immediately
TaskLog.syncLogs(logLocation, taskid, isCleanup);
+
+ // Create the job-conf and set credentials
final JobConf job = new JobConf(task.getJobFile());
+ job.setCredentials(defaultConf.getCredentials());
// set the jobTokenFile into task
task.setJobTokenSecret(JobTokenSecretManager.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul 9 20:27:12 2010
@@ -187,7 +187,7 @@ public abstract class FileInputFormat<K,
}
// get tokens for all the required FileSystems..
- TokenCache.obtainTokensForNamenodes(dirs, job);
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
// Whether we need to recursive look into the directory structure
boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Fri Jul 9 20:27:12 2010
@@ -118,7 +118,8 @@ public abstract class FileOutputFormat<K
setOutputPath(job, outDir);
// get delegation token for the outDir's file system
- TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] {outDir}, job);
// check its existence
if (fs.exists(outDir)) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Jul 9 20:27:12 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapred.lib.KeyF
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;
@@ -322,6 +323,8 @@ public class JobConf extends Configurati
*/
public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
+ private Credentials credentials = new Credentials();
+
/**
* Configuration key to set the logging {@link Level} for the map task.
*
@@ -369,6 +372,12 @@ public class JobConf extends Configurati
*/
public JobConf(Configuration conf) {
super(conf);
+
+ if (conf instanceof JobConf) {
+ JobConf that = (JobConf)conf;
+ credentials = that.credentials;
+ }
+
checkAndWarnDeprecation();
}
@@ -416,6 +425,18 @@ public class JobConf extends Configurati
}
/**
+ * Get credentials for the job.
+ * @return credentials for the job
+ */
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ void setCredentials(Credentials credentials) {
+ this.credentials = credentials;
+ }
+
+ /**
* Get the user jar for the map-reduce job.
*
* @return the user jar for the map-reduce job.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jul 9 20:27:12 2010
@@ -71,7 +71,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -157,7 +157,7 @@ public class JobInProgress {
JobPriority priority = JobPriority.NORMAL;
protected JobTracker jobtracker;
- protected TokenStorage tokenStorage;
+ protected Credentials tokenStorage;
JobHistory jobHistory;
@@ -390,7 +390,7 @@ public class JobInProgress {
public JobInProgress(JobTracker jobtracker,
final JobConf default_conf, int rCount,
JobInfo jobInfo,
- TokenStorage ts
+ Credentials ts
) throws IOException, InterruptedException {
this.restartCount = rCount;
this.jobId = JobID.downgrade(jobInfo.getJobID());
@@ -3695,7 +3695,7 @@ public class JobInProgress {
// add this token to the tokenStorage
if(tokenStorage == null)
- tokenStorage = new TokenStorage();
+ tokenStorage = new Credentials();
TokenCache.setJobToken(token, tokenStorage);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jul 9 20:27:12 2010
@@ -101,7 +101,7 @@ import org.apache.hadoop.net.ScriptBased
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -2991,7 +2991,7 @@ public class JobTracker implements MRCon
public synchronized
org.apache.hadoop.mapreduce.JobStatus
submitJob(org.apache.hadoop.mapreduce.JobID jobId, String jobSubmitDir,
- TokenStorage ts
+ Credentials ts
) throws IOException, InterruptedException {
return submitJob(JobID.downgrade(jobId), jobSubmitDir, ts);
}
@@ -3004,12 +3004,11 @@ public class JobTracker implements MRCon
* of the JobTracker. But JobInProgress adds info that's useful for
* the JobTracker alone.
* @deprecated Use
- * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, TokenStorage)}
+ * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, Credentials)}
* instead
*/
@Deprecated
- public JobStatus submitJob(JobID jobId, String jobSubmitDir,
- TokenStorage ts)
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
return submitJob(jobId, 0, UserGroupInformation.getCurrentUser(),
@@ -3021,7 +3020,7 @@ public class JobTracker implements MRCon
*/
private JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID,
int restartCount, UserGroupInformation ugi,
- String jobSubmitDir, boolean recovered, TokenStorage ts
+ String jobSubmitDir, boolean recovered, Credentials ts
)
throws IOException, InterruptedException {
@@ -3393,9 +3392,11 @@ public class JobTracker implements MRCon
}
/**
- * @see ClientProtocol#setJobPriority(org.apache.hadoop.mapreduce.JobID, String)
+ * Set the priority of a job
+ * @param jobid
+ * @param priority
+ * @throws IOException
*/
- @Override
public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID
jobid, String priority) throws IOException {
setJobPriority(JobID.downgrade(jobid), priority);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jul 9 20:27:12 2010
@@ -53,7 +53,7 @@ import org.apache.hadoop.mapreduce.filec
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -587,10 +587,12 @@ public class LocalJobRunner implements C
}
public org.apache.hadoop.mapreduce.JobStatus submitJob(
- org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, TokenStorage ts)
- throws IOException {
- TokenCache.setTokenStorage(ts);
- return new Job(JobID.downgrade(jobid), jobSubmitDir).status;
+ org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
+ Credentials credentials) throws IOException {
+ Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
+ job.job.setCredentials(credentials);
+ return job.status;
+
}
public void killJob(org.apache.hadoop.mapreduce.JobID id) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Jul 9 20:27:12 2010
@@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.lib.r
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -592,7 +593,7 @@ abstract public class Task implements Wr
} else {
return split;
}
- }
+ }
/**
* The communication thread handles communication with the parent (Task Tracker).
* It sends progress updates if progress has been made or if the task needs to
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jul 9 20:27:12 2010
@@ -82,7 +82,7 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -1036,7 +1036,7 @@ public class TaskTracker
String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
- TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+ Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
if (jt != null) { //could be null in the case of some unit tests
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Jul 9 20:27:12 2010
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.security.Credentials;
/**
* A read-only view of the job that is provided to the tasks while they
@@ -43,6 +44,12 @@ public interface JobContext extends MRJo
public Configuration getConfiguration();
/**
+ * Get credentials for the job.
+ * @return credentials for the job
+ */
+ public Credentials getCredentials();
+
+ /**
* Get the unique ID for the job.
* @return the object with the job id
*/
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Jul 9 20:27:12 2010
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.filec
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ReflectionUtils;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -240,7 +241,8 @@ class JobSubmitter {
// set the public/private visibility of the archives and files
TrackerDistributedCacheManager.determineCacheVisibilities(conf);
// get DelegationToken for each cached file
- TrackerDistributedCacheManager.getDelegationTokens(conf);
+ TrackerDistributedCacheManager.getDelegationTokens(conf, job
+ .getCredentials());
}
private URI getPathURI(Path destPath, String fragment)
@@ -335,7 +337,8 @@ class JobSubmitter {
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
- TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, conf);
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { submitJobDir }, conf);
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
@@ -354,9 +357,9 @@ class JobSubmitter {
//
// Now, actually submit the job (using the submit name)
//
- populateTokenCache(conf);
+ populateTokenCache(conf, job.getCredentials());
status = submitClient.submitJob(
- jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
+ jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
@@ -484,7 +487,8 @@ class JobSubmitter {
// get secret keys and tokens and store them into TokenCache
@SuppressWarnings("unchecked")
- private void populateTokenCache(Configuration conf) throws IOException{
+ private void populateTokenCache(Configuration conf, Credentials credentials)
+ throws IOException {
// create TokenStorage object with user secretKeys
String tokensFileName = conf.get("tokenCacheFile");
if(tokensFileName != null) {
@@ -499,7 +503,8 @@ class JobSubmitter {
mapper.readValue(new File(localFileName), Map.class);
for(Map.Entry<String, String> ent: nm.entrySet()) {
- TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+ credentials.addSecretKey(new Text(ent.getKey()), ent.getValue()
+ .getBytes());
}
} catch (JsonMappingException e) {
json_error = true;
@@ -518,7 +523,7 @@ class JobSubmitter {
for(int i=0; i< nameNodes.length; i++) {
ps[i] = new Path(nameNodes[i]);
}
- TokenCache.obtainTokensForNamenodes(ps, conf);
+ TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
}
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Fri Jul 9 20:27:12 2010
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -694,9 +695,11 @@ public class TrackerDistributedCacheMana
/**
* For each archive or cache file - get the corresponding delegation token
* @param job
+ * @param credentials
* @throws IOException
*/
- public static void getDelegationTokens(Configuration job) throws IOException {
+ public static void getDelegationTokens(Configuration job,
+ Credentials credentials) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
URI[] tfiles = DistributedCache.getCacheFiles(job);
@@ -716,7 +719,7 @@ public class TrackerDistributedCacheMana
}
}
- TokenCache.obtainTokensForNamenodes(ps, job);
+ TokenCache.obtainTokensForNamenodes(credentials, ps, job);
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Fri Jul 9 20:27:12 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.security.Credentials;
/**
* A simple wrapper class that delegates most of its functionality to the
@@ -305,4 +306,9 @@ class ChainMapContextImpl<KEYIN, VALUEIN
base.progress();
}
+ @Override
+ public Credentials getCredentials() {
+ return base.getCredentials();
+ }
+
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Fri Jul 9 20:27:12 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
/**
* A simple wrapper class that delegates most of its functionality to the
@@ -297,4 +298,9 @@ class ChainReduceContextImpl<KEYIN, VALU
public void progress() {
base.progress();
}
+
+ @Override
+ public Credentials getCredentials() {
+ return base.getCredentials();
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul 9 20:27:12 2010
@@ -205,7 +205,8 @@ public abstract class FileInputFormat<K,
}
// get tokens for all the required FileSystems..
- TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
+ job.getConfiguration());
List<IOException> errors = new ArrayList<IOException>();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Fri Jul 9 20:27:12 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
/**
* A {@link Mapper} which wraps a given one to allow custom
@@ -306,5 +307,10 @@ public class WrappedMapper<KEYIN, VALUEI
public String getUser() {
return mapContext.getUser();
}
+
+ @Override
+ public Credentials getCredentials() {
+ return mapContext.getCredentials();
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Jul 9 20:27:12 2010
@@ -134,8 +134,9 @@ public static final String OUTDIR = "map
}
// get delegation token for outDir's file system
- TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
-
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { outDir }, job.getConfiguration());
+
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Fri Jul 9 20:27:12 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Parti
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
/**
* A {@link Reducer} which wraps a given one to allow for custom
@@ -310,5 +311,10 @@ public class WrappedReducer<KEYIN, VALUE
public String getUser() {
return reduceContext.getUser();
}
+
+ @Override
+ public Credentials getCredentials() {
+ return reduceContext.getCredentials();
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Fri Jul 9 20:27:12 2010
@@ -39,8 +39,8 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
@@ -107,8 +107,9 @@ public interface ClientProtocol extends
* Version 31: Added TokenStorage to submitJob
* Version 32: Added delegation tokens (add, renew, cancel)
* Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
+ * Version 34: Modified submitJob to use Credentials instead of TokenStorage.
*/
- public static final long versionID = 33L;
+ public static final long versionID = 34L;
/**
* Allocate a name for the job.
@@ -121,8 +122,8 @@ public interface ClientProtocol extends
* Submit a Job for execution. Returns the latest profile for
* that job.
*/
- public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
- throws IOException, InterruptedException;
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException;
/**
* Get the current status of the cluster
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jul 9 20:27:12 2010
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
@@ -57,60 +57,36 @@ public class TokenCache {
private static final Log LOG = LogFactory.getLog(TokenCache.class);
- private static TokenStorage tokenStorage;
/**
* auxiliary method to get user's secret keys..
* @param alias
* @return secret key from the storage
*/
- public static byte[] getSecretKey(Text alias) {
- if(tokenStorage == null)
+ public static byte[] getSecretKey(Credentials credentials, Text alias) {
+ if(credentials == null)
return null;
- return tokenStorage.getSecretKey(alias);
+ return credentials.getSecretKey(alias);
}
/**
- * auxiliary methods to store user' s secret keys
- * @param alias
- * @param key
- */
- public static void addSecretKey(Text alias, byte[] key) {
- getTokenStorage().addSecretKey(alias, key);
- }
-
- /**
- * auxiliary method to add a delegation token
- */
- public static void addDelegationToken(
- String namenode, Token<? extends TokenIdentifier> t) {
- getTokenStorage().addToken(new Text(namenode), t);
- }
-
- /**
- * auxiliary method
- * @return all the available tokens
- */
- public static Collection<Token<? extends TokenIdentifier>> getAllTokens() {
- return getTokenStorage().getAllTokens();
- }
- /**
* Convenience method to obtain delegation tokens from namenodes
* corresponding to the paths passed.
+ * @param credentials
* @param ps array of paths
* @param conf configuration
* @throws IOException
*/
- public static void obtainTokensForNamenodes(Path [] ps, Configuration conf)
- throws IOException {
+ public static void obtainTokensForNamenodes(Credentials credentials,
+ Path[] ps, Configuration conf) throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
- obtainTokensForNamenodesInternal(ps, conf);
+ obtainTokensForNamenodesInternal(credentials, ps, conf);
}
- static void obtainTokensForNamenodesInternal(Path [] ps, Configuration conf)
- throws IOException {
+ static void obtainTokensForNamenodesInternal(Credentials credentials,
+ Path[] ps, Configuration conf) throws IOException {
// get jobtracker principal id (for the renewer)
Text jtCreds = new Text(conf.get(JTConfig.JT_USER_NAME, ""));
@@ -123,7 +99,7 @@ public class TokenCache {
// see if we already have the token
Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(fs_addr);
+ TokenCache.getDelegationToken(credentials, fs_addr);
if(token != null) {
LOG.debug("DT for " + token.getService() + " is already present");
continue;
@@ -134,7 +110,7 @@ public class TokenCache {
throw new IOException("Token from " + fs_addr + " is null");
token.setService(new Text(fs_addr));
- TokenCache.addDelegationToken(fs_addr, token);
+ credentials.addToken(new Text(fs_addr), token);
LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
";t.service="+token.getService());
}
@@ -161,50 +137,10 @@ public class TokenCache {
*/
@SuppressWarnings("unchecked")
@InterfaceAudience.Private
- public static Token<DelegationTokenIdentifier>
- getDelegationToken(String namenode) {
- return (Token<DelegationTokenIdentifier>)getTokenStorage().
- getToken(new Text(namenode));
- }
-
- /**
- * @return TokenStore object
- */
- @InterfaceAudience.Private
- public static TokenStorage getTokenStorage() {
- if(tokenStorage==null)
- tokenStorage = new TokenStorage();
-
- return tokenStorage;
- }
-
- /**
- * sets TokenStorage
- * @param ts
- */
- @InterfaceAudience.Private
- public static void setTokenStorage(TokenStorage ts) {
- if(tokenStorage != null)
- LOG.warn("Overwriting existing token storage with # keys=" +
- tokenStorage.numberOfSecretKeys());
- tokenStorage = ts;
- }
-
- /**
- * load token storage and stores it
- * @param conf
- * @return Loaded TokenStorage object
- * @throws IOException
- */
- @InterfaceAudience.Private
- public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
- throws IOException {
- if(tokenStorage != null)
- return tokenStorage;
-
- tokenStorage = loadTokens(fileName, conf);
-
- return tokenStorage;
+ public static Token<DelegationTokenIdentifier> getDelegationToken(
+ Credentials credentials, String namenode) {
+ return (Token<DelegationTokenIdentifier>) credentials.getToken(new Text(
+ namenode));
}
/**
@@ -213,13 +149,13 @@ public class TokenCache {
* @throws IOException
*/
@InterfaceAudience.Private
- public static TokenStorage loadTokens(String jobTokenFile, JobConf conf)
+ public static Credentials loadTokens(String jobTokenFile, JobConf conf)
throws IOException {
Path localJobTokenFile = new Path (jobTokenFile);
FileSystem localFS = FileSystem.getLocal(conf);
FSDataInputStream in = localFS.open(localJobTokenFile);
- TokenStorage ts = new TokenStorage();
+ Credentials ts = new Credentials();
ts.readFields(in);
if(LOG.isDebugEnabled()) {
@@ -235,8 +171,8 @@ public class TokenCache {
*/
@InterfaceAudience.Private
public static void setJobToken(Token<? extends TokenIdentifier> t,
- TokenStorage ts) {
- ts.addToken(JOB_TOKEN, t);
+ Credentials credentials) {
+ credentials.addToken(JOB_TOKEN, t);
}
/**
*
@@ -244,8 +180,8 @@ public class TokenCache {
*/
@SuppressWarnings("unchecked")
@InterfaceAudience.Private
- public static Token<JobTokenIdentifier> getJobToken(TokenStorage ts) {
- return (Token<JobTokenIdentifier>) ts.getToken(JOB_TOKEN);
+ public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+ return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
static String buildDTServiceName(URI uri) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Jul 9 20:27:12 2010
@@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -122,7 +122,7 @@ public class DelegationTokenRenewal {
@SuppressWarnings("unchecked")
public static synchronized void registerDelegationTokensForRenewal(
- JobID jobId, TokenStorage ts, Configuration conf) {
+ JobID jobId, Credentials ts, Configuration conf) {
if(ts==null)
return; //nothing to add
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Fri Jul 9 20:27:12 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.filec
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -55,10 +56,12 @@ public class JobContextImpl implements J
* The UserGroupInformation object that has a reference to the current user
*/
protected UserGroupInformation ugi;
+ protected final Credentials credentials;
public JobContextImpl(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
this.jobId = jobId;
+ this.credentials = this.conf.getCredentials();
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
@@ -404,5 +407,9 @@ public class JobContextImpl implements J
public String getUser() {
return conf.getUser();
}
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jul 9 20:27:12 2010
@@ -46,8 +46,9 @@ import org.apache.hadoop.mapred.MiniMRCl
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
@@ -56,6 +57,7 @@ import org.codehaus.jackson.map.ObjectMa
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.Assert;
public class TestTokenCache {
private static final int NUM_OF_KEYS = 10;
@@ -69,22 +71,13 @@ public class TestTokenCache {
public void map(IntWritable key, IntWritable value, Context context)
throws IOException, InterruptedException {
// get token storage and a key
- TokenStorage ts = TokenCache.getTokenStorage();
- byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
- Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+ Credentials ts = context.getCredentials();
+ byte[] key1 = ts.getSecretKey(new Text("alias1"));
+ Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
int dts_size = 0;
if(dts != null)
dts_size = dts.size();
- System.out.println("inside MAP: ts==NULL?=" + (ts==null) +
- "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) +
- ";jobToken = " + (ts==null? "n/a":TokenCache.getJobToken(ts)) +
- "; alias1 key=" + new String(key1) +
- "; dts size= " + dts_size);
-
- for(Token<? extends TokenIdentifier> t : dts) {
- System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
- }
if(dts.size() != 2) { // one job token and one delegation token
throw new RuntimeException("tokens are not available"); // fail the test
@@ -109,8 +102,29 @@ public class TestTokenCache {
reduceSleepTime, reduceSleepCount);
job.setMapperClass(MySleepMapper.class);
+ //Populate tokens here because security is disabled.
+ populateTokens(job);
return job;
}
+
+ private void populateTokens(Job job) {
+ // Credentials in the job will not have delegation tokens
+ // because security is disabled. Fetch delegation tokens
+ // and populate the credential in the job.
+ try {
+ Credentials ts = job.getCredentials();
+ Path p1 = new Path("file1");
+ p1 = p1.getFileSystem(job.getConfiguration()).makeQualified(p1);
+ Credentials cred = new Credentials();
+ TokenCache.obtainTokensForNamenodesInternal(cred, new Path[] { p1 },
+ job.getConfiguration());
+ for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+ ts.addToken(new Text("Hdfs"), t);
+ }
+ } catch (IOException e) {
+ Assert.fail("Exception " + e);
+ }
+ }
}
private static MiniMRCluster mrCluster;
@@ -142,9 +156,6 @@ public class TestTokenCache {
p2 = new Path("file2");
p1 = fs.makeQualified(p1);
- // do not qualify p2
- TokenCache.setTokenStorage(new TokenStorage());
- TokenCache.obtainTokensForNamenodesInternal(new Path [] {p1, p2}, jConf);
}
@AfterClass
@@ -173,7 +184,6 @@ public class TestTokenCache {
throw new IOException(e);
}
- System.out.println("writing secret keys into " + tokenFileName);
try {
File p = new File(tokenFileName.getParent().toString());
p.mkdirs();
@@ -190,8 +200,6 @@ public class TestTokenCache {
Map<String, String> map;
map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
-
- System.out.println("file " + tokenFileName + " verified; size="+ map.size());
}
/**
@@ -257,20 +265,23 @@ public class TestTokenCache {
public void testGetTokensForNamenodes() throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
+ Credentials credentials = new Credentials();
+ TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1,
+ p2 }, jConf);
+
// this token is keyed by hostname:port key.
String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
- Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
+ Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(
+ credentials, fs_addr);
System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt);
assertNotNull("Token for nn is null", nnt);
// verify the size
- Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+ Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
assertEquals("number of tokens is not 1", 1, tns.size());
boolean found = false;
for(Token<? extends TokenIdentifier> t: tns) {
- System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
-
if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
t.getService().equals(new Text(fs_addr))) {
found = true;
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java?rev=962682&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java Fri Jul 9 20:27:12 2010
@@ -0,0 +1,296 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.EmptyInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+@SuppressWarnings("deprecation")
+public class TestTokenCacheOldApi {
+ private static final int NUM_OF_KEYS = 10;
+
+ // my sleep class - adds check for tokenCache
+ static class MyDummyJob extends Configured implements Tool,
+ Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
+ Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
+ Partitioner<IntWritable, NullWritable> {
+ Credentials ts;
+
+ public void configure(JobConf job) {
+ }
+
+ /**
+ * attempts to access tokenCache as from client
+ */
+ public void map(IntWritable key, IntWritable value,
+ OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+ throws IOException {
+ // get token storage and a key
+ byte[] key1 = ts.getSecretKey(new Text("alias1"));
+ Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
+ int dts_size = 0;
+ if(dts != null)
+ dts_size = dts.size();
+
+ if(dts.size() != 2) { // one job token and one delegation token
+ throw new RuntimeException("tokens are not available"); // fail the test
+ }
+
+ if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+ throw new RuntimeException("secret keys are not available"); // fail the test
+ }
+
+ output.collect(new IntWritable(1), NullWritable.get());
+ }
+
+ public JobConf setupJobConf() {
+
+ JobConf job = new JobConf(getConf(), MyDummyJob.class);
+ job.setNumMapTasks(1);
+ job.setNumReduceTasks(1);
+ job.setMapperClass(MyDummyJob.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setReducerClass(MyDummyJob.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setInputFormat(EmptyInputFormat.class);
+ job.setPartitionerClass(MyDummyJob.class);
+ job.setSpeculativeExecution(false);
+ job.setJobName("Sleep job");
+ populateTokens(job);
+ return job;
+ }
+
+ private void populateTokens(JobConf job) {
+ // Credentials in the job will not have delegation tokens
+ // because security is disabled. Fetch delegation tokens
+ // and populate the credential in the job.
+ try {
+ Credentials ts = job.getCredentials();
+ Path p1 = new Path("file1");
+ p1 = p1.getFileSystem(job).makeQualified(p1);
+ Credentials cred = new Credentials();
+ TokenCache.obtainTokensForNamenodesInternal(cred, new Path[] { p1 },
+ job);
+ for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+ ts.addToken(new Text("Hdfs"), t);
+ }
+ } catch (IOException e) {
+ Assert.fail("Exception " + e);
+ }
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void reduce(IntWritable key, Iterator<NullWritable> values,
+ OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+ throws IOException {
+ return;
+ }
+
+ public int getPartition(IntWritable key, NullWritable value,
+ int numPartitions) {
+ return key.get() % numPartitions;
+ }
+
+ public int run(String[] args) throws Exception {
+ JobConf job = setupJobConf();
+ JobClient.runJob(job);
+ return 0;
+ }
+ }
+
+ private static MiniMRCluster mrCluster;
+ private static MiniDFSCluster dfsCluster;
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data","/tmp"), "sleepTest");
+ private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json");
+ private static int numSlaves = 1;
+ private static JobConf jConf;
+ private static ObjectMapper mapper = new ObjectMapper();
+ private static Path p1;
+ private static Path p2;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+ jConf = new JobConf(conf);
+ mrCluster = new MiniMRCluster(0, 0, numSlaves,
+ dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
+ jConf);
+
+ createTokenFileJson();
+ verifySecretKeysInJSONFile();
+ dfsCluster.getNamesystem()
+ .getDelegationTokenSecretManager().startThreads();
+ FileSystem fs = dfsCluster.getFileSystem();
+
+ p1 = new Path("file1");
+ p2 = new Path("file2");
+ p1 = fs.makeQualified(p1);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if(mrCluster != null)
+ mrCluster.shutdown();
+ mrCluster = null;
+ if(dfsCluster != null)
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ // create jason file and put some keys into it..
+ private static void createTokenFileJson() throws IOException {
+ Map<String, String> map = new HashMap<String, String>();
+
+ try {
+ KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1");
+ for(int i=0; i<NUM_OF_KEYS; i++) {
+ SecretKeySpec key = (SecretKeySpec) kg.generateKey();
+ byte [] enc_key = key.getEncoded();
+ map.put("alias"+i, new String(Base64.encodeBase64(enc_key)));
+
+ }
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ File p = new File(tokenFileName.getParent().toString());
+ p.mkdirs();
+ // convert to JSON and save to the file
+ mapper.writeValue(new File(tokenFileName.toString()), map);
+
+ } catch (Exception e) {
+ System.out.println("failed with :" + e.getLocalizedMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void verifySecretKeysInJSONFile() throws IOException {
+ Map<String, String> map;
+ map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
+ assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
+ }
+
+ /**
+ * run a distributed job and verify that TokenCache is available
+ * @throws IOException
+ */
+ @Test
+ public void testTokenCache() throws IOException {
+ // make sure JT starts
+ jConf = mrCluster.createJobConf();
+
+ // provide namenodes names for the job to get the delegation tokens for
+ //String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
+ NameNode nn = dfsCluster.getNameNode();
+ URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
+ jConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
+ // job tracker principle id..
+ jConf.set(JobTracker.JT_USER_NAME, "jt_id");
+
+ // using argument to pass the file name
+ String[] args = {
+ "-tokenCacheFile", tokenFileName.toString(),
+ "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+ };
+
+ int res = -1;
+ try {
+ res = ToolRunner.run(jConf, new MyDummyJob(), args);
+ } catch (Exception e) {
+ System.out.println("Job failed with" + e.getLocalizedMessage());
+ e.printStackTrace(System.out);
+ Assert.fail("Job failed");
+ }
+ assertEquals("dist job res is not 0", res, 0);
+ }
+
+ /**
+ * run a local job and verify that TokenCache is available
+ * @throws NoSuchAlgorithmException
+ * @throws IOException
+ */
+ @Test
+ public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
+ // this is local job
+ String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"};
+ jConf.set("mapreduce.job.credentials.json", tokenFileName.toString());
+
+ int res = -1;
+ try {
+ res = ToolRunner.run(jConf, new MyDummyJob(), args);
+ } catch (Exception e) {
+ System.out.println("Job failed with" + e.getLocalizedMessage());
+ e.printStackTrace(System.out);
+ fail("local Job failed");
+ }
+ assertEquals("local job res is not 0", res, 0);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Jul 9 20:27:12 2010
@@ -38,7 +38,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
@@ -238,7 +238,7 @@ public class TestDelegationTokenRenewal
String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
- TokenStorage ts = new TokenStorage();
+ Credentials ts = new Credentials();
// register the token for renewal
ts.addToken(new Text(nn1), token1);
@@ -273,7 +273,7 @@ public class TestDelegationTokenRenewal
// add another token ( that expires in 2 secs). Then remove it, before
// time is up.
// Wait for 3 secs , and make sure no renews were called
- ts = new TokenStorage();
+ ts = new Credentials();
MyToken token4 = dfs.getDelegationToken(new Text("user4"));
//to cause this one to be set for renew in 2 secs
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Fri Jul 9 20:27:12 2010
@@ -739,15 +739,15 @@ public class DistCp implements Tool {
}
/** Sanity check for srcPath */
- private static void checkSrcPath(Configuration conf, List<Path> srcPaths
- ) throws IOException {
+ private static void checkSrcPath(Configuration conf, List<Path> srcPaths,
+ JobConf jobConf) throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
List<Path> unglobbed = new LinkedList<Path>();
// get tokens for all the required FileSystems..
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
- TokenCache.obtainTokensForNamenodes(ps, conf);
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
for (Path p : srcPaths) {
@@ -779,9 +779,10 @@ public class DistCp implements Tool {
if (!args.dryrun || args.flags.contains(Options.UPDATE)) {
LOG.info("destPath=" + args.dst);
}
- checkSrcPath(conf, args.srcs);
JobConf job = createJobConf(conf);
+
+ checkSrcPath(conf, args.srcs, job);
if (args.preservedAttributes != null) {
job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
}
@@ -1230,7 +1231,8 @@ public class DistCp implements Tool {
FileSystem dstfs = args.dst.getFileSystem(conf);
// get tokens for all the required FileSystems..
- TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(),
+ new Path[] {args.dst}, conf);
boolean dstExists = dstfs.exists(args.dst);