You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/23 02:50:14 UTC
svn commit: r966918 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/security/
src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/map...
Author: ddas
Date: Fri Jul 23 00:50:13 2010
New Revision: 966918
URL: http://svn.apache.org/viewvc?rev=966918&view=rev
Log:
MAPREDUCE-1566. Adds a configuration attribute using which job clients can specify a credentials file. The tokens from there will be passed to the job. Contributed by Jitendra Pandey and Owen O'Malley.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 23 00:50:13 2010
@@ -94,6 +94,10 @@ Trunk (unreleased changes)
MAPREDUCE-1733. Makes pipes applications secure. (Jitendra Pandey via ddas)
+ MAPREDUCE-1566. Adds a configuration attribute using which job clients can
+ specify a credentials file. The tokens from there will be passed to the job.
+ (Jitendra Pandey and Owen O'Malley via ddas)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jul 23 00:50:13 2010
@@ -3687,8 +3687,10 @@ public class JobInProgress {
private void generateAndStoreTokens() throws IOException{
Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
- // we need to create this file using the jobtracker's filesystem
- FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
+
+ if (tokenStorage == null) {
+ tokenStorage = new Credentials();
+ }
//create JobToken file and write token to it
JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
@@ -3697,15 +3699,10 @@ public class JobInProgress {
jobtracker.getJobTokenSecretManager());
token.setService(identifier.getJobId());
- // add this token to the tokenStorage
- if(tokenStorage == null)
- tokenStorage = new Credentials();
-
TokenCache.setJobToken(token, tokenStorage);
// write TokenStorage out
- tokenStorage.write(os);
- os.close();
+ tokenStorage.writeTokenStorageFile(keysFile, conf);
LOG.info("jobToken generated and stored with users keys in "
+ keysFile.toUri().getPath());
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Jul 23 00:50:13 2010
@@ -340,6 +340,13 @@ class JobSubmitter {
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
+ // load the binary file, if the user has one
+ String binaryTokenFilename = conf.get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ job.getCredentials().readTokenStorageFile(
+ new Path("file:///" + binaryTokenFilename), conf);
+ }
+
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
@@ -368,7 +375,9 @@ class JobSubmitter {
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
- jtFs.delete(submitJobDir, true);
+ if (jtFs != null && submitJobDir != null)
+ jtFs.delete(submitJobDir, true);
+
}
}
}
@@ -490,7 +499,7 @@ class JobSubmitter {
private void populateTokenCache(Configuration conf, Credentials credentials)
throws IOException {
// create TokenStorage object with user secretKeys
- String tokensFileName = conf.get("tokenCacheFile");
+ String tokensFileName = conf.get("mapreduce.job.credentials.json");
if(tokensFileName != null) {
LOG.info("loading user's secret keys from " + tokensFileName);
String localFileName = new Path(tokensFileName).toUri().getPath();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jul 23 00:50:13 2010
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -91,8 +90,10 @@ public class TokenCache {
KerberosName jtKrbName = new KerberosName(conf.get(JTConfig.JT_USER_NAME,
""));
Text delegTokenRenewer = new Text(jtKrbName.getShortName());
-
+ boolean notReadFile = true;
for(Path p: ps) {
+ // TODO: Connecting to the namenode is not required in the case,
+ // where we already have the credentials in the file
FileSystem fs = FileSystem.get(p.toUri(), conf);
if(fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
@@ -106,6 +107,21 @@ public class TokenCache {
LOG.debug("DT for " + token.getService() + " is already present");
continue;
}
+ if (notReadFile) { //read the file only once
+ String binaryTokenFilename =
+ conf.get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ credentials.readTokenStorageFile(new Path("file:///" +
+ binaryTokenFilename), conf);
+ }
+ notReadFile = false;
+ token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ continue;
+ }
+ }
// get the token
token = dfs.getDelegationToken(delegTokenRenewer);
if(token==null)
@@ -179,18 +195,16 @@ public class TokenCache {
@InterfaceAudience.Private
public static Credentials loadTokens(String jobTokenFile, JobConf conf)
throws IOException {
- Path localJobTokenFile = new Path (jobTokenFile);
- FileSystem localFS = FileSystem.getLocal(conf);
- FSDataInputStream in = localFS.open(localJobTokenFile);
+ Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
Credentials ts = new Credentials();
- ts.readFields(in);
+ ts.readTokenStorageFile(localJobTokenFile, conf);
if(LOG.isDebugEnabled()) {
LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath()
- +"; num of sec keys = " + ts.numberOfSecretKeys());
+ +"; num of sec keys = " + ts.numberOfSecretKeys() + " Number of tokens " +
+ ts.numberOfTokens());
}
- in.close();
return ts;
}
/**
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Jul 23 00:50:13 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
@@ -287,13 +288,9 @@ public class TestTaskTrackerLocalization
File dir = new File(TEST_ROOT_DIR, jobId.toString());
if(!dir.exists())
assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
-
- File jobTokenFile = new File(dir, TokenCache.JOB_TOKEN_HDFS_FILE);
- FileOutputStream fos = new FileOutputStream(jobTokenFile);
- java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
- Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
- jt.write(out); // writing empty file, we don't the keys for this test
- out.close();
+ // writing empty file, we don't need the keys for this test
+ new Credentials().writeTokenStorageFile(new Path("file:///" + dir,
+ TokenCache.JOB_TOKEN_HDFS_FILE), new Configuration());
}
@Override
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java?rev=966918&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java Fri Jul 23 00:50:13 2010
@@ -0,0 +1,200 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestBinaryTokenFile {
+
+ // my sleep class
+ static class MySleepMapper extends SleepJob.SleepMapper {
+ /**
+ * attempts to access tokenCache as from client
+ */
+ @Override
+ public void map(IntWritable key, IntWritable value, Context context)
+ throws IOException, InterruptedException {
+ // get token storage and a key
+ Credentials ts = context.getCredentials();
+ Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
+
+
+ if(dts.size() != 2) { // one job token and one delegation token
+ throw new RuntimeException("tokens are not available"); // fail the test
+ }
+
+ Token<? extends TokenIdentifier> dt = ts.getToken(new Text("Hdfs"));
+
+ //Verify that dt is same as the token in the file
+ String tokenFile = context.getConfiguration().get(
+ "mapreduce.job.credentials.binary");
+ Credentials cred = new Credentials();
+ cred.readTokenStorageStream(new DataInputStream(new FileInputStream(
+ tokenFile)));
+ for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+ if (!dt.equals(t)) {
+ throw new RuntimeException(
+ "Delegation token in job is not same as the token passed in file."
+ + " tokenInFile=" + t + ", dt=" + dt);
+ }
+ }
+
+ super.map(key, value, context);
+ }
+ }
+
+ class MySleepJob extends SleepJob {
+ @Override
+ public Job createJob(int numMapper, int numReducer,
+ long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount)
+ throws IOException {
+ Job job = super.createJob(numMapper, numReducer,
+ mapSleepTime, mapSleepCount,
+ reduceSleepTime, reduceSleepCount);
+
+ job.setMapperClass(MySleepMapper.class);
+ //Populate tokens here because security is disabled.
+ setupBinaryTokenFile(job);
+ return job;
+ }
+
+ private void setupBinaryTokenFile(Job job) {
+ // Credentials in the job will not have delegation tokens
+ // because security is disabled. Fetch delegation tokens
+ // and store in binary token file.
+ try {
+ Credentials cred1 = new Credentials();
+ Credentials cred2 = new Credentials();
+ TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
+ job.getConfiguration());
+ for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
+ cred2.addToken(new Text("Hdfs"), t);
+ }
+ DataOutputStream os = new DataOutputStream(new FileOutputStream(
+ binaryTokenFileName.toString()));
+ cred2.writeTokenStorageToStream(os);
+ os.close();
+ job.getConfiguration().set("mapreduce.job.credentials.binary",
+ binaryTokenFileName.toString());
+ } catch (IOException e) {
+ Assert.fail("Exception " + e);
+ }
+ }
+ }
+
+ private static MiniMRCluster mrCluster;
+ private static MiniDFSCluster dfsCluster;
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data","/tmp"));
+ private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary");
+ private static int numSlaves = 1;
+ private static JobConf jConf;
+ private static Path p1;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+ jConf = new JobConf(conf);
+ mrCluster = new MiniMRCluster(0, 0, numSlaves,
+ dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
+ jConf);
+
+ dfsCluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
+ FileSystem fs = dfsCluster.getFileSystem();
+
+ p1 = new Path("file1");
+ p1 = fs.makeQualified(p1);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if(mrCluster != null)
+ mrCluster.shutdown();
+ mrCluster = null;
+ if(dfsCluster != null)
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ /**
+ * run a distributed job and verify that TokenCache is available
+ * @throws IOException
+ */
+ @Test
+ public void testBinaryTokenFile() throws IOException {
+
+ System.out.println("running dist job");
+
+ // make sure JT starts
+ jConf = mrCluster.createJobConf();
+
+ // provide namenodes names for the job to get the delegation tokens for
+ String nnUri = dfsCluster.getURI().toString();
+ jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
+ // job tracker principla id..
+ jConf.set(JTConfig.JT_USER_NAME, "jt_id");
+
+ // using argument to pass the file name
+ String[] args = {
+ "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+ };
+
+ int res = -1;
+ try {
+ res = ToolRunner.run(jConf, new MySleepJob(), args);
+ } catch (Exception e) {
+ System.out.println("Job failed with" + e.getLocalizedMessage());
+ e.printStackTrace(System.out);
+ fail("Job failed");
+ }
+ assertEquals("dist job res is not 0", res, 0);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jul 23 00:50:13 2010
@@ -248,7 +248,7 @@ public class TestTokenCache {
System.out.println("running local job");
// this is local job
String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"};
- jConf.set("tokenCacheFile", tokenFileName.toString());
+ jConf.set("mapreduce.job.credentials.json", tokenFileName.toString());
int res = -1;
try {